[Zope-CVS] CVS: Packages/pypes/pypes/tests - test_graph.py:1.8 test_identity.py:1.6

Casey Duncan casey at zope.com
Mon Feb 9 16:14:59 EST 2004


Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv11692/pypes/tests

Modified Files:
	test_graph.py test_identity.py 
Log Message:
dos -> unix line endings


=== Packages/pypes/pypes/tests/test_graph.py 1.7 => 1.8 ===
--- Packages/pypes/pypes/tests/test_graph.py:1.7	Fri Jan 30 00:35:39 2004
+++ Packages/pypes/pypes/tests/test_graph.py	Mon Feb  9 16:14:58 2004
@@ -1,726 +1,726 @@
-##############################################################################
-#
-# Copyright (c) 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Event service tests
-
-$Id$"""
-
-import unittest
-import ZODB
-from random import randint
-from zope.interface.verify import verifyObject
-from persistent import Persistent
-from common import PypesTestCase, PypesPersistentTest, TestClass, sort
-from pypes import graph
-
-
-class TestDirectedGraph(unittest.TestCase):
-        
-    def _newObj(self):
-        try:
-            self.t += 1
-        except AttributeError:
-            self.t = 0L
-        return self.t
-    
-    def setUp(self):
-        self.graph = graph.DirectedGraph()
-
-    def testInterface(self):
-        from pypes.interfaces import IGraph, IGraphNodes, IGraphEdges
-        self.failUnless(verifyObject(IGraph, self.graph))
-        self.failUnless(verifyObject(IGraphNodes, self.graph.nodes))
-        self.failUnless(verifyObject(IGraphEdges, self.graph.edges))
-            
-    def testAddNodes(self):
-        objs = [self._newObj() for i in range(10)]
-        for o in objs:
-            self.failUnless(self.graph.nodes.add(o))
-        for o in objs:
-            self.failIf(self.graph.nodes.add(o))
-        self.assertEqual(len(objs), len(self.graph.nodes))
-        for o in objs:
-            self.failUnless(o in self.graph.nodes)
-
-    def testAddEdges(self):
-        objs = [self._newObj() for i in range(10)]
-        edges = []
-        for s in range(len(objs)):
-            for t in range(len(objs) - s):
-                edges.append((objs[s], objs[t]))
-        for source, target in edges:
-            self.failUnless(self.graph.edges.add(source, target))
-            self.failUnless(source in self.graph.nodes)
-            self.failUnless(target in self.graph.nodes)
-        for source, target in edges:
-            self.failIf(self.graph.edges.add(source, target))
-        for pair in edges:
-            self.failUnless(pair in self.graph.edges)
-            
-    def testAddAndGetValue(self):
-        edges = [(self._newObj(), self._newObj()) for i in range(10)]
-        data = 0
-        for source, target in edges:
-            self.graph.edges.add(source, target, data)
-            data += 1
-        data = 0
-        for source, target in edges:
-            self.assertEqual(self.graph.edges.get(source, target), data)
-            data += 1
-            
-    def testAddPreservesValue(self):
-        source, target = self._newObj(), self._newObj()
-        self.graph.edges.add(source, target, 42)
-        self.graph.edges.add(source, target)
-        self.assertEqual(self.graph.edges.get(source, target), 42)
-    
-    def testGetValueFails(self):
-        from pypes.exceptions import GraphValueError
-        source, target = self._newObj(), self._newObj()
-        self.graph.edges.add(source, target)
-        self.assertRaises(
-            GraphValueError, self.graph.edges.get, source, target)
-        
-    def testGetValueDefault(self):
-        source, target = self._newObj(), self._newObj()
-        self.graph.edges.add(source, target)
-        self.assertEqual(
-            self.graph.edges.get(source, target, 'yadda'), 'yadda')        
-            
-    def testRemoveEdges(self):
-        edges = [(self._newObj(), self._newObj()) for i in range(10)]
-        for source, target in edges:
-            self.graph.edges.add(source, target)
-        for edge in edges:
-            self.graph.edges.remove(*edge)
-            self.failIf(edge in self.graph.edges)
-        self.failIf(self.graph.edges)
-        
-    def testRemoveEdgeLeavesNodes(self):
-        source, target = self._newObj(), self._newObj()
-        self.graph.edges.add(source, target)
-        self.graph.edges.remove(source, target)
-        self.failUnless(source in self.graph.nodes)
-        self.failUnless(target in self.graph.nodes)
-           
-    def testRemoveEdgeFails(self):
-        from pypes.exceptions import GraphLookupError
-        source, target = self._newObj(), self._newObj()
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.remove, source, target)
-            
-    def testRemoveEdgeRemovesValue(self):
-        from pypes.exceptions import GraphLookupError
-        source, target = self._newObj(), self._newObj()
-        self.graph.edges.add(source, target, 101)
-        self.graph.edges.remove(source, target)
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.get, source, target)
-        
-    def testRemoveNode(self):
-        objs = [self._newObj() for i in range(10)]
-        for o in objs:
-            self.graph.nodes.add(o)
-        for o in objs:
-            self.graph.nodes.remove(o)
-            self.failIf(o in self.graph.nodes)
-        self.failIf(self.graph.nodes)
-
-    def testRemoveNodeRemovesEdges(self):
-        obj = [self._newObj() for i in range(5)]
-        self.graph.edges.add(obj[0], obj[1])
-        self.graph.edges.add(obj[0], obj[2])
-        self.graph.edges.add(obj[1], obj[4])
-        self.graph.edges.add(obj[3], obj[1])
-        self.graph.edges.add(obj[4], obj[0])
-        self.graph.nodes.remove(obj[4])
-        self.failIf((obj[4], obj[0]) in self.graph.edges)
-        self.failIf((obj[1], obj[4]) in self.graph.edges)
-        self.failUnless((obj[0], obj[1]) in self.graph.edges)
-        self.failUnless((obj[0], obj[2]) in self.graph.edges)
-        self.failUnless((obj[3], obj[1]) in self.graph.edges)
-        self.graph.nodes.remove(obj[0])
-        self.failIf((obj[0], obj[1]) in self.graph.edges)
-        self.failIf((obj[0], obj[2]) in self.graph.edges)
-        self.failUnless((obj[3], obj[1]) in self.graph.edges)
-        self.assertEqual(len(self.graph.edges), 1)
-        self.assertEqual(len(self.graph.nodes), 3)
-
-    def testRemoveNodeFails(self):
-        from pypes.exceptions import GraphLookupError
-        self.assertRaises(
-            GraphLookupError, self.graph.nodes.remove, self._newObj())        
-    
-    def testGetValueFails(self):
-        from pypes.exceptions import GraphLookupError
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.get, ob1, ob2)
-        self.graph.edges.add(ob1, ob2)
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.get, ob1, ob3)
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.get, ob3, ob2)
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.get, ob2, ob1)
-    
-    def testAddReplacesValue(self):
-        from pypes.exceptions import GraphValueError
-        ob1, ob2 = self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.assertRaises(
-            GraphValueError, self.graph.edges.get, ob1, ob2)
-        self.graph.edges.add(ob1, ob2, 'data!')
-        self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
-        self.graph.edges.add(ob1, ob2)
-        self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
-    
-    def testSetValue(self):
-        ob1, ob2 = self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.set(ob1, ob2, '!atad')
-        self.assertEqual(self.graph.edges.get(ob1, ob2), '!atad')
-        
-    def testSetValueFails(self):
-        from pypes.exceptions import GraphLookupError
-        ob1, ob2 = self._newObj(), self._newObj()
-        self.assertRaises(
-            GraphLookupError, self.graph.edges.set, ob1, ob2, 'sorry')
-        
-    def testEdgesContains(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob2, ob3)
-        self.failUnless((ob1, ob2) in self.graph.edges)
-        self.failIf((ob2, ob1) in self.graph.edges)
-        self.failUnless((ob2, ob3) in self.graph.edges)
-        self.failIf((ob1, ob3) in self.graph.edges)
-        
-    def testIterNodes(self):
-        objs = [self._newObj() for i in range(10)]
-        for o in objs:
-            self.graph.nodes.add(o)
-        fromiter = list(self.graph.nodes)
-        objs.sort()
-        fromiter.sort()
-        self.assertEqual(objs, fromiter)
-    
-    def testIterEdges(self):
-        edges = [(self._newObj(), self._newObj()) for i in range(10)]
-        for source, target in edges:
-            self.graph.edges.add(source, target)
-        fromiter = list(self.graph.edges)
-        edges.sort()
-        fromiter.sort()
-        self.assertEqual(edges, fromiter)
-        
-    def testIterValues(self):
-        edges = [(self._newObj(), self._newObj()) for i in range(10)]
-        value = 0
-        for source, target in edges:
-            self.graph.edges.add(source, target, value)
-            value += 1        
-        values = list(self.graph.edges.iterValues())
-        value = 0
-        for source, target in edges:
-            self.failUnless(value in values)
-            value += 1
-        
-    def testIterItems(self):
-        edges = [(self._newObj(), self._newObj()) for i in range(10)]
-        value = 0
-        for source, target in edges:
-            self.graph.edges.add(source, target, value)
-            value += 1        
-        items = list(self.graph.edges.iterItems())
-        value = 0
-        for source, target in edges:
-            self.failUnless(((source, target), value) in items)
-            value += 1
-    
-    def testIterTopologicalSimple(self):
-        objs = [self._newObj() for i in range(5)]
-        for i in range(4):
-            self.graph.edges.add(objs[i], objs[i+1])
-        objs.reverse()
-        self.assertEqual(list(self.graph.nodes.iterTopological()), objs)
-    
-    def testIterTopologicalWithCycle(self):
-        from pypes.exceptions import GraphCycleError
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob2, ob3)
-        self.graph.edges.add(ob3, ob1)
-        self.assertRaises(
-            GraphCycleError, list, self.graph.nodes.iterTopological())
-    
-    def testIterTopologicalNonTree(self):
-        objs = [self._newObj() for i in range(7)]
-        self.graph.edges.add(objs[0], objs[1])
-        self.graph.edges.add(objs[1], objs[2])
-        self.graph.edges.add(objs[2], objs[5])
-        self.graph.edges.add(objs[3], objs[0])
-        self.graph.edges.add(objs[3], objs[1])
-        self.graph.edges.add(objs[3], objs[2])
-        self.graph.edges.add(objs[3], objs[4])
-        self.graph.edges.add(objs[3], objs[5])
-        self.graph.edges.add(objs[3], objs[6])
-        self.graph.edges.add(objs[4], objs[5])
-        self.graph.edges.add(objs[6], objs[4])
-        tsort = list(self.graph.nodes.iterTopological())
-        self.assertEqual(len(tsort), len(objs))  
-        self.failUnless(objs[5] is tsort[0])
-        self.failUnless(objs[3] is tsort[-1])
-        idx = tsort.index
-        self.failUnless(idx(objs[0]) > idx(objs[1]))
-        self.failUnless(idx(objs[1]) > idx(objs[2]))
-        self.failUnless(idx(objs[6]) > idx(objs[4]))
-    
-    def testIterTopologicalDisconnected(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.nodes.add(ob1)
-        self.graph.nodes.add(ob2)
-        self.graph.nodes.add(ob3)
-        tsort = list(self.graph.nodes.iterTopological())
-        self.assertEqual(len(tsort), 3)
-        self.failUnless(ob1 in tsort)
-        self.failUnless(ob2 in tsort)
-        self.failUnless(ob3 in tsort)
-       
-    ## XXX Need to add transitiveClosure tests
-    
-    def testDegree(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob1, ob1)
-        self.graph.edges.add(ob2, ob1)
-        self.assertEqual(self.graph.nodes.degree(ob1), 3)
-        self.assertEqual(self.graph.nodes.degree(ob2), 1)
-        self.assertEqual(self.graph.nodes.degree(ob3), 0)
-    
-    def testTargets(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob1, ob1)
-        self.graph.edges.add(ob2, ob1)
-        n = list(self.graph.nodes.targets(ob1))
-        objs = [ob1, ob2, ob3]
-        n.sort()
-        objs.sort()
-        self.assertEqual(n, objs)
-        self.assertEqual(list(self.graph.nodes.targets(ob2)), [ob1])
-        self.failIf(self.graph.nodes.targets(ob3))
-    
-    def testSources(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob3, ob2)
-        self.graph.edges.add(ob1, ob1)
-        self.graph.edges.add(ob2, ob1)
-        self.assertEqual(list(self.graph.nodes.sources(ob1)), [ob1, ob2])
-        self.assertEqual(list(self.graph.nodes.sources(ob2)), [ob1, ob3])
-        self.failIf(self.graph.nodes.sources(ob3))
-     
-    def testTargetsTransitive(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob2, ob1)
-        self.graph.edges.add(ob2, ob3)
-        self.graph.edges.add(self._newObj(), self._newObj())
-        r = list(self.graph.nodes.targets(ob1, 1))
-        objs = [ob1, ob2, ob3]
-        r.sort()
-        objs.sort()
-        self.assertEqual(r, objs)
-        r = list(self.graph.nodes.targets(ob2, 1))
-        r.sort()
-        self.assertEqual(r, objs)
-        self.failIf(self.graph.nodes.targets(ob3, 1))
-     
-    def testSourcesTransitive(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob2, ob1)
-        self.graph.edges.add(ob2, ob3)
-        self.graph.edges.add(self._newObj(), self._newObj())
-        self.assertEqual(list(self.graph.nodes.sources(ob3, 1)), [ob1, ob2])
-        self.assertEqual(list(self.graph.nodes.sources(ob2, 1)), [ob1, ob2])
-        self.assertEqual(list(self.graph.nodes.sources(ob1, 1)), [ob1, ob2])
-    
-    def _makeMatrix(self):
-        nodes = {}
-        for y in range(5):
-            for x in range(5):
-                n = nodes[x,y] = self._newObj()
-                if x:
-                    if (x + y) % 2:
-                        self.graph.edges.add(nodes[x-1, y], n, 1)
-                    else:
-                        self.graph.edges.add(n, nodes[x-1, y], 1)
-                if y:
-                    if (x + y) % 2:
-                        self.graph.edges.add(n, nodes[x, y-1], 1)
-                    else:
-                        self.graph.edges.add(nodes[x, y-1], n, 1)
-                elif not x:
-                    self.graph.nodes.add(n)
-        return nodes
-        
-    def testShortestPathMatrixWithoutValues(self):
-        nodes = self._makeMatrix()
-        result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4])
-        self.failIf(result is None)
-        i, x1, y1 = 0, 0, 0
-        path = [nodes[x1, y1]]
-        while x1 < 4 or y1 < 4:
-            if i % 2:
-                y1 += 1
-            else:
-                x1 += 1
-            path.append(nodes[x1, y1])
-            i += 1
-        self.assertEqual(result, path)
-    
-    def testShortestPathUnreachable(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob3, ob2)
-        self.assertEqual(self.graph.shortestPath(ob1, ob3), None) 
-        
-    def testShortestPathWithValues(self):
-        objs = [self._newObj() for i in range(5)]
-        self.graph.edges.add(objs[0], objs[1], 10)
-        self.graph.edges.add(objs[0], objs[2], 2)
-        self.graph.edges.add(objs[2], objs[1], 10)
-        self.graph.edges.add(objs[2], objs[3], 1)
-        self.graph.edges.add(objs[3], objs[4], 1)
-        self.graph.edges.add(objs[4], objs[1], 3)
-        self.assertEqual(
-            self.graph.shortestPath(objs[0], objs[1], 1),
-            [objs[0], objs[2], objs[3], objs[4], objs[1]])
-    
-    def testShortestPathMatrixWithValues(self):
-        nodes = self._makeMatrix()
-        result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4], 1)
-        self.failIf(result is None)
-        i, x1, y1 = 0, 0, 0
-        path = [nodes[x1, y1]]
-        while x1 < 4 or y1 < 4:
-            if i % 2:
-                y1 += 1
-            else:
-                x1 += 1
-            path.append(nodes[x1, y1])
-            i += 1
-        self.assertEqual(result, path)
-            
-    def testShortestPathWithValuesUnreachable(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2, 3)
-        self.graph.edges.add(ob3, ob2, 4)
-        self.assertEqual(self.graph.shortestPath(ob1, ob3, 1), None)
-    
-    def testShortestPathWithBogusSource(self):
-        from pypes.exceptions import GraphLookupError
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob3, ob2, 3)
-        self.assertRaises(
-            GraphLookupError, self.graph.shortestPath, ob1, ob2)
-    
-    def testShortestPathWithBogusTarget(self):
-        from pypes.exceptions import GraphLookupError
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob3, ob2, 3)
-        self.assertRaises(
-            GraphLookupError, self.graph.shortestPath, ob2, ob1)
-        
-    def testAllPathsSimple(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob2, ob3)
-        paths = list(self.graph.allPaths())
-        expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
-        self.assertEqual(sort(paths), sort(expected))
-    
-    def testAllPathsSimpleWithSource(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob2, ob3)
-        paths = list(self.graph.allPaths(source=ob1))
-        expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3]]
-        self.assertEqual(sort(paths), sort(expected))
-        paths = list(self.graph.allPaths(source=ob2))
-        self.assertEqual(paths, [[ob2, ob3]])
-        paths = list(self.graph.allPaths(source=ob3))
-        self.assertEqual(paths, [])
-    
-    def testAllPathsSimpleWithTarget(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob2, ob3)
-        paths = list(self.graph.allPaths(target=ob1))
-        self.assertEqual(paths, [])
-        paths = list(self.graph.allPaths(target=ob2))
-        self.assertEqual(paths, [[ob1, ob2]])
-        paths = list(self.graph.allPaths(target=ob3))
-        expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
-        self.assertEqual(sort(paths), sort(expected))
-    
-    def testAllPathsSimpleWithSourceAndTarget(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob2, ob3)
-        paths = list(self.graph.allPaths(ob1, ob2))
-        self.assertEqual(paths, [[ob1, ob2]])
-        paths = list(self.graph.allPaths(ob1, ob3))
-        expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
-        paths = list(self.graph.allPaths(ob2, ob3))
-        self.assertEqual(paths, [[ob2, ob3]])  
-        paths = list(self.graph.allPaths(ob2, ob1))
-        self.assertEqual(paths, [])  
-        
-    def testAllPathsWithCycle(self):
-        from pypes.graph import cycle
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob2, ob1)
-        self.graph.edges.add(ob3, ob1)
-        paths = list(self.graph.allPaths())
-        expected = [[ob1, ob2], cycle([ob1, ob2, ob1]), 
-                    [ob2, ob1], cycle([ob2, ob1, ob2]),
-                    [ob3, ob1], [ob3, ob1, ob2], cycle([ob3, ob1, ob2, ob1])]
-        paths.sort()
-        expected.sort()
-        self.assertEqual(paths, expected)
-        for i in range(len(paths)):
-            self.failUnless(type(paths[i]) is type(expected[i]))
-
-    def testAllPathsWithBogusSource(self):
-        from pypes.exceptions import GraphLookupError
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob3, ob2)
-        self.assertRaises(GraphLookupError, self.graph.allPaths(ob1, ob2).next)
-    
-    def testAllPathsWithBogusTarget(self):
-        from pypes.exceptions import GraphLookupError
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob3, ob2)
-        self.assertRaises(GraphLookupError, self.graph.allPaths(ob3, ob1).next)
-
-    def testAllPathsFullyConnected(self):
-        objs = [self._newObj() for i in range(3)]
-        for s in objs:
-            for t in objs:
-                self.graph.edges.add(s, t)
-        allpaths = list(self.graph.allPaths())
-        count = 0
-        for i in objs:
-            for j in objs:
-                self.failUnless([i, j] in allpaths)
-                count += 1
-                if j is not i:
-                    for k in objs:
-                        self.failUnless([i, j, k] in allpaths)
-                        count += 1
-                        if k not in (j, i):
-                            for m in objs:
-                                self.failUnless([i, j, k, m] in allpaths)
-                                count += 1
-        self.assertEqual(count, len(allpaths))
-    
-    def testAllPathsUnconnected(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.nodes.add(ob1)
-        self.graph.nodes.add(ob2)
-        self.graph.nodes.add(ob3)
-        self.assertEqual(list(self.graph.allPaths()), [])
-        
-        
-class TestDirectedIdGraph(PypesTestCase, TestDirectedGraph):
-    
-    def setUp(self):
-        from pypes.graph import DirectedIdGraph
-        self.graph = DirectedIdGraph()
-        PypesTestCase.setUp(self)
-        
-    def testAddNodeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob = TestClass()
-        self.assertRaises(IdentityError, self.graph.nodes.add, ob)
-        
-    def testRemoveNodeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob = TestClass()
-        self.assertRaises(IdentityError, self.graph.nodes.remove, ob)
-        
-    def testContainsNodeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob = TestClass()
-        try:
-            ob in self.graph.nodes
-        except IdentityError:
-            pass
-        except:
-            raise
-        else:
-            self.fail()
-    
-    def testDegreeNodeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob = TestClass()
-        self.assertRaises(IdentityError, self.graph.nodes.degree, ob)
-    
-    def testSourcesNodeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob = TestClass()
-        self.assertRaises(IdentityError, self.graph.nodes.sources, ob)
-    
-    def testTargetsNodeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob = TestClass()
-        self.assertRaises(IdentityError, self.graph.nodes.targets, ob)
-        
-    def testAddEdgeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        self.assertRaises(IdentityError, self.graph.edges.add, ob1, ob2)
-        self.assertRaises(IdentityError, self.graph.edges.add, ob2, ob1)
-        
-    def testRemoveEdgeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        self.assertRaises(IdentityError, self.graph.edges.remove, ob1, ob2)
-        self.assertRaises(IdentityError, self.graph.edges.remove, ob2, ob1)
-        
-    def testGetEdgeValueUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        self.assertRaises(IdentityError, self.graph.edges.get, ob1, ob2)
-        self.assertRaises(IdentityError, self.graph.edges.get, ob2, ob1)
-        
-    def testSetEdgeValueUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        self.assertRaises(IdentityError, self.graph.edges.set, ob1, ob2, 0)
-        self.assertRaises(IdentityError, self.graph.edges.set, ob2, ob1, 0)
-        
-    def testContainsEdgeUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        try:
-            (ob1, ob2) in self.graph.edges
-        except IdentityError:
-            pass
-        except:
-            raise
-        else:
-            self.fail()
-        try:
-            (ob2, ob1) in self.graph.edges
-        except IdentityError:
-            pass
-        except:
-            raise
-        else:
-            self.fail()
-            
-    def testShortestPathUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        self.assertRaises(IdentityError, self.graph.shortestPath, ob1, ob2)
-        self.assertRaises(IdentityError, self.graph.shortestPath, ob2, ob1)
-            
-    def testAllPathsUnregistered(self):
-        from pypes.exceptions import IdentityError
-        ob1, ob2 = TestClass(), self._newObj()
-        self.assertRaises(IdentityError, self.graph.allPaths(ob1, ob2).next)
-        
-        
-class TestPQueue(unittest.TestCase):
-    
-    def setUp(self):
-        from pypes.graph import PQueue
-        self.q = PQueue()
-    
-    def testInsertAndPop(self):
-        from random import randint
-        values = [randint(0, 10000) for i in range(10)]
-        for i in values:
-            self.q.insert(str(i), i)
-        values.sort()
-        while self.q:
-            k, m = self.q.popMin()
-            v = values.pop(0)
-            self.assertEqual(k, str(v))
-            self.assertEqual(m, v)
-            k, m = self.q.popMax()
-            v = values.pop()
-            self.assertEqual(k, str(v))
-            self.assertEqual(m, v)
-
-class TestDirectedIdGraphPersistence(PypesPersistentTest):
-    
-    def setUp(self):
-        from pypes.graph import DirectedIdGraph
-        super(TestDirectedIdGraphPersistence, self).setUp()
-        self.root = self.db.open().root()
-        self.root['g'] = self.graph = DirectedIdGraph()
-        get_transaction().commit()
-    
-    def testBasicPersistence(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        self.graph.edges.add(ob1, ob2)
-        self.graph.edges.add(ob1, ob3)
-        self.graph.edges.add(ob2, ob1)
-        self.graph.edges.add(ob2, ob3)
-        get_transaction().commit()
-        
-        g2 = self.db.open().root()['g']
-        self.assertEqual(len(g2.nodes), 3)
-        self.assertEqual(len(g2.edges), 4)
-        self.failUnless((ob1, ob2) in g2.edges)
-        self.failUnless((ob1, ob3) in g2.edges)
-        self.failUnless((ob2, ob1) in g2.edges)
-        self.failUnless((ob2, ob3) in g2.edges)
-        
-    def testConcurrentAddNodes(self):
-        ob1, ob2= self._newObj(), self._newObj()
-        g2 = self.db.open().root()['g']
-        list(g2.nodes); list(g2.edges)
-        
-        self.graph.nodes.add(ob1)
-        get_transaction().commit()
-        
-        g2.nodes.add(ob2)
-        get_transaction().commit()
-    
-    def testConcurrentAddEdges(self):
-        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
-        g2 = self.db.open().root()['g']
-        list(g2.nodes); list(g2.edges)
-        
-        self.graph.edges.add(ob1, ob2)
-        g2.edges.add(ob1, ob2)
-        get_transaction().commit()
-        
-        g2.edges.add(ob2, ob3)
-        get_transaction().commit()     
-        
-if __name__ == '__main__':
-    unittest.main()
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Event service tests
+
+$Id$"""
+
+import unittest
+import ZODB
+from random import randint
+from zope.interface.verify import verifyObject
+from persistent import Persistent
+from common import PypesTestCase, PypesPersistentTest, TestClass, sort
+from pypes import graph
+
+
+class TestDirectedGraph(unittest.TestCase):
+        
+    def _newObj(self):
+        try:
+            self.t += 1
+        except AttributeError:
+            self.t = 0L
+        return self.t
+    
+    def setUp(self):
+        self.graph = graph.DirectedGraph()
+
+    def testInterface(self):
+        from pypes.interfaces import IGraph, IGraphNodes, IGraphEdges
+        self.failUnless(verifyObject(IGraph, self.graph))
+        self.failUnless(verifyObject(IGraphNodes, self.graph.nodes))
+        self.failUnless(verifyObject(IGraphEdges, self.graph.edges))
+            
+    def testAddNodes(self):
+        objs = [self._newObj() for i in range(10)]
+        for o in objs:
+            self.failUnless(self.graph.nodes.add(o))
+        for o in objs:
+            self.failIf(self.graph.nodes.add(o))
+        self.assertEqual(len(objs), len(self.graph.nodes))
+        for o in objs:
+            self.failUnless(o in self.graph.nodes)
+
+    def testAddEdges(self):
+        objs = [self._newObj() for i in range(10)]
+        edges = []
+        for s in range(len(objs)):
+            for t in range(len(objs) - s):
+                edges.append((objs[s], objs[t]))
+        for source, target in edges:
+            self.failUnless(self.graph.edges.add(source, target))
+            self.failUnless(source in self.graph.nodes)
+            self.failUnless(target in self.graph.nodes)
+        for source, target in edges:
+            self.failIf(self.graph.edges.add(source, target))
+        for pair in edges:
+            self.failUnless(pair in self.graph.edges)
+            
+    def testAddAndGetValue(self):
+        edges = [(self._newObj(), self._newObj()) for i in range(10)]
+        data = 0
+        for source, target in edges:
+            self.graph.edges.add(source, target, data)
+            data += 1
+        data = 0
+        for source, target in edges:
+            self.assertEqual(self.graph.edges.get(source, target), data)
+            data += 1
+            
+    def testAddPreservesValue(self):
+        source, target = self._newObj(), self._newObj()
+        self.graph.edges.add(source, target, 42)
+        self.graph.edges.add(source, target)
+        self.assertEqual(self.graph.edges.get(source, target), 42)
+    
+    def testGetValueFails(self):
+        from pypes.exceptions import GraphValueError
+        source, target = self._newObj(), self._newObj()
+        self.graph.edges.add(source, target)
+        self.assertRaises(
+            GraphValueError, self.graph.edges.get, source, target)
+        
+    def testGetValueDefault(self):
+        source, target = self._newObj(), self._newObj()
+        self.graph.edges.add(source, target)
+        self.assertEqual(
+            self.graph.edges.get(source, target, 'yadda'), 'yadda')        
+            
+    def testRemoveEdges(self):
+        edges = [(self._newObj(), self._newObj()) for i in range(10)]
+        for source, target in edges:
+            self.graph.edges.add(source, target)
+        for edge in edges:
+            self.graph.edges.remove(*edge)
+            self.failIf(edge in self.graph.edges)
+        self.failIf(self.graph.edges)
+        
+    def testRemoveEdgeLeavesNodes(self):
+        source, target = self._newObj(), self._newObj()
+        self.graph.edges.add(source, target)
+        self.graph.edges.remove(source, target)
+        self.failUnless(source in self.graph.nodes)
+        self.failUnless(target in self.graph.nodes)
+           
+    def testRemoveEdgeFails(self):
+        from pypes.exceptions import GraphLookupError
+        source, target = self._newObj(), self._newObj()
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.remove, source, target)
+            
+    def testRemoveEdgeRemovesValue(self):
+        from pypes.exceptions import GraphLookupError
+        source, target = self._newObj(), self._newObj()
+        self.graph.edges.add(source, target, 101)
+        self.graph.edges.remove(source, target)
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.get, source, target)
+        
+    def testRemoveNode(self):
+        objs = [self._newObj() for i in range(10)]
+        for o in objs:
+            self.graph.nodes.add(o)
+        for o in objs:
+            self.graph.nodes.remove(o)
+            self.failIf(o in self.graph.nodes)
+        self.failIf(self.graph.nodes)
+
+    def testRemoveNodeRemovesEdges(self):
+        obj = [self._newObj() for i in range(5)]
+        self.graph.edges.add(obj[0], obj[1])
+        self.graph.edges.add(obj[0], obj[2])
+        self.graph.edges.add(obj[1], obj[4])
+        self.graph.edges.add(obj[3], obj[1])
+        self.graph.edges.add(obj[4], obj[0])
+        self.graph.nodes.remove(obj[4])
+        self.failIf((obj[4], obj[0]) in self.graph.edges)
+        self.failIf((obj[1], obj[4]) in self.graph.edges)
+        self.failUnless((obj[0], obj[1]) in self.graph.edges)
+        self.failUnless((obj[0], obj[2]) in self.graph.edges)
+        self.failUnless((obj[3], obj[1]) in self.graph.edges)
+        self.graph.nodes.remove(obj[0])
+        self.failIf((obj[0], obj[1]) in self.graph.edges)
+        self.failIf((obj[0], obj[2]) in self.graph.edges)
+        self.failUnless((obj[3], obj[1]) in self.graph.edges)
+        self.assertEqual(len(self.graph.edges), 1)
+        self.assertEqual(len(self.graph.nodes), 3)
+
+    def testRemoveNodeFails(self):
+        from pypes.exceptions import GraphLookupError
+        self.assertRaises(
+            GraphLookupError, self.graph.nodes.remove, self._newObj())        
+    
+    def testGetValueFails(self):
+        from pypes.exceptions import GraphLookupError
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.get, ob1, ob2)
+        self.graph.edges.add(ob1, ob2)
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.get, ob1, ob3)
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.get, ob3, ob2)
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.get, ob2, ob1)
+    
+    def testAddReplacesValue(self):
+        from pypes.exceptions import GraphValueError
+        ob1, ob2 = self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.assertRaises(
+            GraphValueError, self.graph.edges.get, ob1, ob2)
+        self.graph.edges.add(ob1, ob2, 'data!')
+        self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
+        self.graph.edges.add(ob1, ob2)
+        self.assertEqual(self.graph.edges.get(ob1, ob2), 'data!')
+    
+    def testSetValue(self):
+        ob1, ob2 = self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.set(ob1, ob2, '!atad')
+        self.assertEqual(self.graph.edges.get(ob1, ob2), '!atad')
+        
+    def testSetValueFails(self):
+        from pypes.exceptions import GraphLookupError
+        ob1, ob2 = self._newObj(), self._newObj()
+        self.assertRaises(
+            GraphLookupError, self.graph.edges.set, ob1, ob2, 'sorry')
+        
+    def testEdgesContains(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob2, ob3)
+        self.failUnless((ob1, ob2) in self.graph.edges)
+        self.failIf((ob2, ob1) in self.graph.edges)
+        self.failUnless((ob2, ob3) in self.graph.edges)
+        self.failIf((ob1, ob3) in self.graph.edges)
+        
+    def testIterNodes(self):
+        objs = [self._newObj() for i in range(10)]
+        for o in objs:
+            self.graph.nodes.add(o)
+        fromiter = list(self.graph.nodes)
+        objs.sort()
+        fromiter.sort()
+        self.assertEqual(objs, fromiter)
+    
+    def testIterEdges(self):
+        edges = [(self._newObj(), self._newObj()) for i in range(10)]
+        for source, target in edges:
+            self.graph.edges.add(source, target)
+        fromiter = list(self.graph.edges)
+        edges.sort()
+        fromiter.sort()
+        self.assertEqual(edges, fromiter)
+        
+    def testIterValues(self):
+        edges = [(self._newObj(), self._newObj()) for i in range(10)]
+        value = 0
+        for source, target in edges:
+            self.graph.edges.add(source, target, value)
+            value += 1        
+        values = list(self.graph.edges.iterValues())
+        value = 0
+        for source, target in edges:
+            self.failUnless(value in values)
+            value += 1
+        
+    def testIterItems(self):
+        edges = [(self._newObj(), self._newObj()) for i in range(10)]
+        value = 0
+        for source, target in edges:
+            self.graph.edges.add(source, target, value)
+            value += 1        
+        items = list(self.graph.edges.iterItems())
+        value = 0
+        for source, target in edges:
+            self.failUnless(((source, target), value) in items)
+            value += 1
+    
+    def testIterTopologicalSimple(self):
+        objs = [self._newObj() for i in range(5)]
+        for i in range(4):
+            self.graph.edges.add(objs[i], objs[i+1])
+        objs.reverse()
+        self.assertEqual(list(self.graph.nodes.iterTopological()), objs)
+    
+    def testIterTopologicalWithCycle(self):
+        from pypes.exceptions import GraphCycleError
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob2, ob3)
+        self.graph.edges.add(ob3, ob1)
+        self.assertRaises(
+            GraphCycleError, list, self.graph.nodes.iterTopological())
+    
+    def testIterTopologicalNonTree(self):
+        objs = [self._newObj() for i in range(7)]
+        self.graph.edges.add(objs[0], objs[1])
+        self.graph.edges.add(objs[1], objs[2])
+        self.graph.edges.add(objs[2], objs[5])
+        self.graph.edges.add(objs[3], objs[0])
+        self.graph.edges.add(objs[3], objs[1])
+        self.graph.edges.add(objs[3], objs[2])
+        self.graph.edges.add(objs[3], objs[4])
+        self.graph.edges.add(objs[3], objs[5])
+        self.graph.edges.add(objs[3], objs[6])
+        self.graph.edges.add(objs[4], objs[5])
+        self.graph.edges.add(objs[6], objs[4])
+        tsort = list(self.graph.nodes.iterTopological())
+        self.assertEqual(len(tsort), len(objs))  
+        self.failUnless(objs[5] is tsort[0])
+        self.failUnless(objs[3] is tsort[-1])
+        idx = tsort.index
+        self.failUnless(idx(objs[0]) > idx(objs[1]))
+        self.failUnless(idx(objs[1]) > idx(objs[2]))
+        self.failUnless(idx(objs[6]) > idx(objs[4]))
+    
+    def testIterTopologicalDisconnected(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.nodes.add(ob1)
+        self.graph.nodes.add(ob2)
+        self.graph.nodes.add(ob3)
+        tsort = list(self.graph.nodes.iterTopological())
+        self.assertEqual(len(tsort), 3)
+        self.failUnless(ob1 in tsort)
+        self.failUnless(ob2 in tsort)
+        self.failUnless(ob3 in tsort)
+       
+    ## XXX Need to add transitiveClosure tests
+    
+    def testDegree(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob1, ob1)
+        self.graph.edges.add(ob2, ob1)
+        self.assertEqual(self.graph.nodes.degree(ob1), 3)
+        self.assertEqual(self.graph.nodes.degree(ob2), 1)
+        self.assertEqual(self.graph.nodes.degree(ob3), 0)
+    
+    def testTargets(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob1, ob1)
+        self.graph.edges.add(ob2, ob1)
+        n = list(self.graph.nodes.targets(ob1))
+        objs = [ob1, ob2, ob3]
+        n.sort()
+        objs.sort()
+        self.assertEqual(n, objs)
+        self.assertEqual(list(self.graph.nodes.targets(ob2)), [ob1])
+        self.failIf(self.graph.nodes.targets(ob3))
+    
+    def testSources(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob3, ob2)
+        self.graph.edges.add(ob1, ob1)
+        self.graph.edges.add(ob2, ob1)
+        self.assertEqual(list(self.graph.nodes.sources(ob1)), [ob1, ob2])
+        self.assertEqual(list(self.graph.nodes.sources(ob2)), [ob1, ob3])
+        self.failIf(self.graph.nodes.sources(ob3))
+     
+    def testTargetsTransitive(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob2, ob1)
+        self.graph.edges.add(ob2, ob3)
+        self.graph.edges.add(self._newObj(), self._newObj())
+        r = list(self.graph.nodes.targets(ob1, 1))
+        objs = [ob1, ob2, ob3]
+        r.sort()
+        objs.sort()
+        self.assertEqual(r, objs)
+        r = list(self.graph.nodes.targets(ob2, 1))
+        r.sort()
+        self.assertEqual(r, objs)
+        self.failIf(self.graph.nodes.targets(ob3, 1))
+     
+    def testSourcesTransitive(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob2, ob1)
+        self.graph.edges.add(ob2, ob3)
+        self.graph.edges.add(self._newObj(), self._newObj())
+        self.assertEqual(list(self.graph.nodes.sources(ob3, 1)), [ob1, ob2])
+        self.assertEqual(list(self.graph.nodes.sources(ob2, 1)), [ob1, ob2])
+        self.assertEqual(list(self.graph.nodes.sources(ob1, 1)), [ob1, ob2])
+    
+    def _makeMatrix(self):
+        nodes = {}
+        for y in range(5):
+            for x in range(5):
+                n = nodes[x,y] = self._newObj()
+                if x:
+                    if (x + y) % 2:
+                        self.graph.edges.add(nodes[x-1, y], n, 1)
+                    else:
+                        self.graph.edges.add(n, nodes[x-1, y], 1)
+                if y:
+                    if (x + y) % 2:
+                        self.graph.edges.add(n, nodes[x, y-1], 1)
+                    else:
+                        self.graph.edges.add(nodes[x, y-1], n, 1)
+                elif not x:
+                    self.graph.nodes.add(n)
+        return nodes
+        
+    def testShortestPathMatrixWithoutValues(self):
+        nodes = self._makeMatrix()
+        result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4])
+        self.failIf(result is None)
+        i, x1, y1 = 0, 0, 0
+        path = [nodes[x1, y1]]
+        while x1 < 4 or y1 < 4:
+            if i % 2:
+                y1 += 1
+            else:
+                x1 += 1
+            path.append(nodes[x1, y1])
+            i += 1
+        self.assertEqual(result, path)
+    
+    def testShortestPathUnreachable(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob3, ob2)
+        self.assertEqual(self.graph.shortestPath(ob1, ob3), None) 
+        
+    def testShortestPathWithValues(self):
+        objs = [self._newObj() for i in range(5)]
+        self.graph.edges.add(objs[0], objs[1], 10)
+        self.graph.edges.add(objs[0], objs[2], 2)
+        self.graph.edges.add(objs[2], objs[1], 10)
+        self.graph.edges.add(objs[2], objs[3], 1)
+        self.graph.edges.add(objs[3], objs[4], 1)
+        self.graph.edges.add(objs[4], objs[1], 3)
+        self.assertEqual(
+            self.graph.shortestPath(objs[0], objs[1], 1),
+            [objs[0], objs[2], objs[3], objs[4], objs[1]])
+    
+    def testShortestPathMatrixWithValues(self):
+        nodes = self._makeMatrix()
+        result = self.graph.shortestPath(nodes[0, 0], nodes[4, 4], 1)
+        self.failIf(result is None)
+        i, x1, y1 = 0, 0, 0
+        path = [nodes[x1, y1]]
+        while x1 < 4 or y1 < 4:
+            if i % 2:
+                y1 += 1
+            else:
+                x1 += 1
+            path.append(nodes[x1, y1])
+            i += 1
+        self.assertEqual(result, path)
+            
+    def testShortestPathWithValuesUnreachable(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2, 3)
+        self.graph.edges.add(ob3, ob2, 4)
+        self.assertEqual(self.graph.shortestPath(ob1, ob3, 1), None)
+    
+    def testShortestPathWithBogusSource(self):
+        from pypes.exceptions import GraphLookupError
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob3, ob2, 3)
+        self.assertRaises(
+            GraphLookupError, self.graph.shortestPath, ob1, ob2)
+    
+    def testShortestPathWithBogusTarget(self):
+        from pypes.exceptions import GraphLookupError
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob3, ob2, 3)
+        self.assertRaises(
+            GraphLookupError, self.graph.shortestPath, ob2, ob1)
+        
+    def testAllPathsSimple(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob2, ob3)
+        paths = list(self.graph.allPaths())
+        expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
+        self.assertEqual(sort(paths), sort(expected))
+    
+    def testAllPathsSimpleWithSource(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob2, ob3)
+        paths = list(self.graph.allPaths(source=ob1))
+        expected = [[ob1, ob2], [ob1, ob2, ob3], [ob1, ob3]]
+        self.assertEqual(sort(paths), sort(expected))
+        paths = list(self.graph.allPaths(source=ob2))
+        self.assertEqual(paths, [[ob2, ob3]])
+        paths = list(self.graph.allPaths(source=ob3))
+        self.assertEqual(paths, [])
+    
+    def testAllPathsSimpleWithTarget(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob2, ob3)
+        paths = list(self.graph.allPaths(target=ob1))
+        self.assertEqual(paths, [])
+        paths = list(self.graph.allPaths(target=ob2))
+        self.assertEqual(paths, [[ob1, ob2]])
+        paths = list(self.graph.allPaths(target=ob3))
+        expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
+        self.assertEqual(sort(paths), sort(expected))
+    
+    def testAllPathsSimpleWithSourceAndTarget(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob2, ob3)
+        paths = list(self.graph.allPaths(ob1, ob2))
+        self.assertEqual(paths, [[ob1, ob2]])
+        paths = list(self.graph.allPaths(ob1, ob3))
+        expected = [[ob1, ob2, ob3], [ob1, ob3], [ob2, ob3]]
+        paths = list(self.graph.allPaths(ob2, ob3))
+        self.assertEqual(paths, [[ob2, ob3]])  
+        paths = list(self.graph.allPaths(ob2, ob1))
+        self.assertEqual(paths, [])  
+        
+    def testAllPathsWithCycle(self):
+        from pypes.graph import cycle
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob2, ob1)
+        self.graph.edges.add(ob3, ob1)
+        paths = list(self.graph.allPaths())
+        expected = [[ob1, ob2], cycle([ob1, ob2, ob1]), 
+                    [ob2, ob1], cycle([ob2, ob1, ob2]),
+                    [ob3, ob1], [ob3, ob1, ob2], cycle([ob3, ob1, ob2, ob1])]
+        paths.sort()
+        expected.sort()
+        self.assertEqual(paths, expected)
+        for i in range(len(paths)):
+            self.failUnless(type(paths[i]) is type(expected[i]))
+
+    def testAllPathsWithBogusSource(self):
+        from pypes.exceptions import GraphLookupError
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob3, ob2)
+        self.assertRaises(GraphLookupError, self.graph.allPaths(ob1, ob2).next)
+    
+    def testAllPathsWithBogusTarget(self):
+        from pypes.exceptions import GraphLookupError
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob3, ob2)
+        self.assertRaises(GraphLookupError, self.graph.allPaths(ob3, ob1).next)
+
+    def testAllPathsFullyConnected(self):
+        objs = [self._newObj() for i in range(3)]
+        for s in objs:
+            for t in objs:
+                self.graph.edges.add(s, t)
+        allpaths = list(self.graph.allPaths())
+        count = 0
+        for i in objs:
+            for j in objs:
+                self.failUnless([i, j] in allpaths)
+                count += 1
+                if j is not i:
+                    for k in objs:
+                        self.failUnless([i, j, k] in allpaths)
+                        count += 1
+                        if k not in (j, i):
+                            for m in objs:
+                                self.failUnless([i, j, k, m] in allpaths)
+                                count += 1
+        self.assertEqual(count, len(allpaths))
+    
+    def testAllPathsUnconnected(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.nodes.add(ob1)
+        self.graph.nodes.add(ob2)
+        self.graph.nodes.add(ob3)
+        self.assertEqual(list(self.graph.allPaths()), [])
+        
+        
+class TestDirectedIdGraph(PypesTestCase, TestDirectedGraph):
+    
+    def setUp(self):
+        from pypes.graph import DirectedIdGraph
+        self.graph = DirectedIdGraph()
+        PypesTestCase.setUp(self)
+        
+    def testAddNodeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob = TestClass()
+        self.assertRaises(IdentityError, self.graph.nodes.add, ob)
+        
+    def testRemoveNodeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob = TestClass()
+        self.assertRaises(IdentityError, self.graph.nodes.remove, ob)
+        
+    def testContainsNodeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob = TestClass()
+        try:
+            ob in self.graph.nodes
+        except IdentityError:
+            pass
+        except:
+            raise
+        else:
+            self.fail()
+    
+    def testDegreeNodeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob = TestClass()
+        self.assertRaises(IdentityError, self.graph.nodes.degree, ob)
+    
+    def testSourcesNodeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob = TestClass()
+        self.assertRaises(IdentityError, self.graph.nodes.sources, ob)
+    
+    def testTargetsNodeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob = TestClass()
+        self.assertRaises(IdentityError, self.graph.nodes.targets, ob)
+        
+    def testAddEdgeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        self.assertRaises(IdentityError, self.graph.edges.add, ob1, ob2)
+        self.assertRaises(IdentityError, self.graph.edges.add, ob2, ob1)
+        
+    def testRemoveEdgeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        self.assertRaises(IdentityError, self.graph.edges.remove, ob1, ob2)
+        self.assertRaises(IdentityError, self.graph.edges.remove, ob2, ob1)
+        
+    def testGetEdgeValueUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        self.assertRaises(IdentityError, self.graph.edges.get, ob1, ob2)
+        self.assertRaises(IdentityError, self.graph.edges.get, ob2, ob1)
+        
+    def testSetEdgeValueUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        self.assertRaises(IdentityError, self.graph.edges.set, ob1, ob2, 0)
+        self.assertRaises(IdentityError, self.graph.edges.set, ob2, ob1, 0)
+        
+    def testContainsEdgeUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        try:
+            (ob1, ob2) in self.graph.edges
+        except IdentityError:
+            pass
+        except:
+            raise
+        else:
+            self.fail()
+        try:
+            (ob2, ob1) in self.graph.edges
+        except IdentityError:
+            pass
+        except:
+            raise
+        else:
+            self.fail()
+            
+    def testShortestPathUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        self.assertRaises(IdentityError, self.graph.shortestPath, ob1, ob2)
+        self.assertRaises(IdentityError, self.graph.shortestPath, ob2, ob1)
+            
+    def testAllPathsUnregistered(self):
+        from pypes.exceptions import IdentityError
+        ob1, ob2 = TestClass(), self._newObj()
+        self.assertRaises(IdentityError, self.graph.allPaths(ob1, ob2).next)
+        
+        
+class TestPQueue(unittest.TestCase):
+    
+    def setUp(self):
+        from pypes.graph import PQueue
+        self.q = PQueue()
+    
+    def testInsertAndPop(self):
+        from random import randint
+        values = [randint(0, 10000) for i in range(10)]
+        for i in values:
+            self.q.insert(str(i), i)
+        values.sort()
+        while self.q:
+            k, m = self.q.popMin()
+            v = values.pop(0)
+            self.assertEqual(k, str(v))
+            self.assertEqual(m, v)
+            k, m = self.q.popMax()
+            v = values.pop()
+            self.assertEqual(k, str(v))
+            self.assertEqual(m, v)
+
+class TestDirectedIdGraphPersistence(PypesPersistentTest):
+    
+    def setUp(self):
+        from pypes.graph import DirectedIdGraph
+        super(TestDirectedIdGraphPersistence, self).setUp()
+        self.root = self.db.open().root()
+        self.root['g'] = self.graph = DirectedIdGraph()
+        get_transaction().commit()
+    
+    def testBasicPersistence(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        self.graph.edges.add(ob1, ob2)
+        self.graph.edges.add(ob1, ob3)
+        self.graph.edges.add(ob2, ob1)
+        self.graph.edges.add(ob2, ob3)
+        get_transaction().commit()
+        
+        g2 = self.db.open().root()['g']
+        self.assertEqual(len(g2.nodes), 3)
+        self.assertEqual(len(g2.edges), 4)
+        self.failUnless((ob1, ob2) in g2.edges)
+        self.failUnless((ob1, ob3) in g2.edges)
+        self.failUnless((ob2, ob1) in g2.edges)
+        self.failUnless((ob2, ob3) in g2.edges)
+        
+    def testConcurrentAddNodes(self):
+        ob1, ob2= self._newObj(), self._newObj()
+        g2 = self.db.open().root()['g']
+        list(g2.nodes); list(g2.edges)
+        
+        self.graph.nodes.add(ob1)
+        get_transaction().commit()
+        
+        g2.nodes.add(ob2)
+        get_transaction().commit()
+    
+    def testConcurrentAddEdges(self):
+        ob1, ob2, ob3 = self._newObj(), self._newObj(), self._newObj()
+        g2 = self.db.open().root()['g']
+        list(g2.nodes); list(g2.edges)
+        
+        self.graph.edges.add(ob1, ob2)
+        g2.edges.add(ob1, ob2)
+        get_transaction().commit()
+        
+        g2.edges.add(ob2, ob3)
+        get_transaction().commit()     
+        
+if __name__ == '__main__':
+    unittest.main()


=== Packages/pypes/pypes/tests/test_identity.py 1.5 => 1.6 ===
--- Packages/pypes/pypes/tests/test_identity.py:1.5	Sun Feb  8 22:56:22 2004
+++ Packages/pypes/pypes/tests/test_identity.py	Mon Feb  9 16:14:58 2004
@@ -1,637 +1,637 @@
-##############################################################################
-#
-# Copyright (c) 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Identity service tests
-
-$Id$"""
-
-import ZODB
-from ZODB.POSException import ConflictError
-import unittest
-from random import randint
-from zope.interface.verify import verifyObject
-from persistent import Persistent
-from BTrees.OOBTree import OOTreeSet, difference
-from types import TypeType
-from common \
-    import PypesTestCase, TestClass, PersistenceTest, PypesPersistentTest
-
-class TestIdentityService(unittest.TestCase):
-    
-    def setUp(self):
-        from pypes.identity import IdentityService
-        self.identity = IdentityService()
-
-    def testInterfaces(self):
-        from pypes.interfaces import IPersistentService, IIdentityService
-        self.failUnless(verifyObject(IPersistentService, self.identity))
-        self.failUnless(verifyObject(IIdentityService, self.identity))
-        
-    def testIdTypeReturnsType(self):
-        self.failUnless(isinstance(self.identity.idType(), TypeType))
-        
-    def testIdIsCorrectType(self):
-        ident = self.identity.register(TestClass())
-        self.failUnless(isinstance(ident, self.identity.idType()))
-
-    def testIdIsTrue(self):
-        self.failUnless(self.identity.register(TestClass()))
-        
-    def testIdsUniqueAndOrderable(self):
-        ids = OOTreeSet()
-        register = self.identity.register
-        for i in xrange(1000):
-            self.failUnless(ids.insert(register(TestClass())))
-        self.assertEqual(len(ids), 1000)
-    
-    def testRegisterFailure(self):
-        from pypes.exceptions import IdentityError
-        self.assertRaises(
-            IdentityError, self.identity.register, None)
-            
-    def testDuplicateRegistrationFails(self):
-        from pypes.exceptions import IdentityError
-        o = TestClass()
-        self.identity.register(o)
-        self.assertRaises(
-            IdentityError, self.identity.register, o)
-        
-    def testHasId(self):
-        ids = []
-        register = self.identity.register
-        for i in xrange(1000):
-            ids.append(register(TestClass()))
-        ids.sort()
-        hasId = self.identity.hasId
-        for i in ids:
-            self.failUnless(hasId(i))
-            
-    def testNotHasId(self):
-        from pypes.identity import IdentityService
-        altidentity = IdentityService()
-        register = self.identity.register
-        ids = OOTreeSet()
-        for i in xrange(500):
-            ids.insert(register(TestClass()))
-        hasId = self.identity.hasId
-        for i in xrange(500):
-            ident = altidentity.register(TestClass())
-            self.failUnless(
-                bool(hasId(ident)) == bool(ids.has_key(ident)))
-        self.failIf(hasId(0))
-    
-    def testHasIdWrongType(self):
-        self.assertRaises(TypeError, self.identity.hasId, None)
-        self.assertRaises(TypeError, self.identity.hasId, TestClass)
-        self.assertRaises(TypeError, self.identity.hasId, TestClass())
-            
-    def testContains(self):
-        objs = []
-        register = self.identity.register
-        for i in xrange(1000):
-            o = TestClass()
-            objs.append(o)
-            register(o)
-        while objs:
-            o = objs.pop(randint(0, len(objs) - 1))
-            self.failUnless(o in self.identity)
-    
-    def testNotContains(self):
-        register = self.identity.register
-        for i in xrange(333):
-            self.failIf(TestClass() in self.identity)
-        for i in xrange(333):
-            register(TestClass())
-        for i in xrange(333):
-            self.failIf(TestClass() in self.identity)
-        
-    def testRemoveId(self):
-        from pypes.exceptions import IdentityKeyError
-        from pypes.identity import pypesid
-        o = TestClass()
-        self.failIf(o in self.identity)
-        ident = self.identity.register(o)
-        self.failUnless(o in self.identity)
-        self.identity.removeId(ident)
-        self.failIf(o in self.identity)
-        self.assertRaises(IdentityKeyError, self.identity.removeId, ident)
-        self.failUnless(pypesid(o) is None)
-        
-    def testRemoveObject(self):
-        from pypes.exceptions import IdentityError
-        from pypes.identity import pypesid
-        o = TestClass()
-        self.failIf(o in self.identity)
-        self.identity.register(o)
-        self.failUnless(o in self.identity)
-        self.identity.remove(o)
-        self.failIf(o in self.identity)
-        self.assertRaises(IdentityError, self.identity.remove, o)
-        self.failUnless(pypesid(o) is None)
-    
-    def testGetObject(self):
-        ids = []
-        objs = {}
-        register = self.identity.register
-        for i in xrange(1000):
-            o = TestClass()
-            ident = register(o)
-            ids.append(ident)
-            objs[ident] = o
-            o.id = ident
-        ids.sort()
-        getObject = self.identity.getObject
-        for ident in ids:
-            self.assertEqual(getObject(ident).id, ident)
-            self.failUnless(getObject(ident) is objs[ident])
-    
-    def testGetObjectFails(self):
-        from pypes.identity import IdentityService
-        from pypes.exceptions import IdentityKeyError
-        altidentity = IdentityService()
-        ident = self.identity.register(TestClass())
-        badident = altidentity.register(TestClass())
-        while ident == badident:
-            badident = altidentity.register(TestClass())
-        self.assertRaises(IdentityKeyError, self.identity.getObject, badident)
-    
-    def testLen(self):
-        self.failIf(self.identity)
-        ident = self.identity.register(TestClass())
-        self.assertEqual(len(self.identity), 1)
-        self.identity.removeId(ident)
-        self.assertEqual(len(self.identity), 0)
-        for i in xrange(100):
-            self.identity.register(TestClass())
-            self.assertEqual(len(self.identity), i + 1)
-
-    def testIterate(self):
-        objs = {}
-        register = self.identity.register
-        for i in xrange(1000):
-            o = TestClass()
-            objs[o] = register(o)
-        for o in self.identity:
-            del objs[o]
-        self.failIf(objs)
-        
-    def testIterateIds(self):
-        ids = {}
-        register = self.identity.register
-        for i in xrange(1000):
-            ids[register(TestClass())] = None
-        for i in self.identity.iterIds():
-            del ids[i]
-        self.failIf(ids)
-        
-    def testIdSet(self):
-        self.failIf(self.identity.idSet())
-        ids = OOTreeSet()
-        register = self.identity.register
-        for i in xrange(100):
-            ids.insert(register(TestClass()))
-        self.failIf(difference(OOTreeSet(self.identity.idSet()), ids))
-        
-class TestListener(object):
-    
-    def __init__(self):
-        self.received = []
-    
-    def receive(self, msg):
-        self.received.append(msg)
-        
-class TestIdentityServiceEvents(PypesTestCase):
-    
-    def testIdRegisteredEvent(self):
-        from pypes import services
-        from pypes.identity import pypesid, IdRegisteredMessage
-        event = services.event(self.conn)
-        tl = TestListener()
-        event.registerListener(tl, 'receive', IdRegisteredMessage)
-        o = self._newObj()
-        msg, = tl.received
-        self.failUnless(msg.object is o)
-        self.failUnless(msg.id is pypesid(o))        
-
-    def testIdRegisteredEventMultiple(self):
-        from pypes import services
-        from pypes.identity import IdRegisteredMessage
-        event = services.event(self.conn)
-        tl = TestListener()
-        event.registerListener(tl, 'receive', IdRegisteredMessage)
-        objs = [self._newObj() for i in range(10)]
-        self.assertEqual(len(tl.received), 10)
-        for o in objs:
-            self.identity.remove(o)
-        self.assertEqual(len(tl.received), 10)
-    
-    def testIdUnregisteredEvent(self):
-        from pypes import services
-        from pypes.identity import pypesid, IdUnregisteredMessage
-        event = services.event(self.conn)
-        tl = TestListener()
-        event.registerListener(tl, 'receive', IdUnregisteredMessage)
-        o = self._newObj()
-        id = pypesid(o)
-        self.assertEqual(len(tl.received), 0)
-        self.identity.remove(o)
-        msg, = tl.received
-        self.failUnless(msg.object is o)
-        self.failUnless(msg.id is id) 
-
-    def testIdUnregisteredEventMultiple(self):
-        from pypes import services
-        from pypes.identity import IdUnregisteredMessage
-        event = services.event(self.conn)
-        tl = TestListener()
-        event.registerListener(tl, 'receive', IdUnregisteredMessage)
-        objs = [self._newObj() for i in range(10)]
-        self.assertEqual(len(tl.received), 0)
-        for o in objs:
-            self.identity.remove(o)
-        self.assertEqual(len(tl.received), 10)
-    
-    def testListenForIdEventsFunc(self):
-        from pypes import services
-        from pypes.identity import listenForIdEvents
-        from pypes.identity import IdRegisteredMessage, IdUnregisteredMessage
-        event = services.event(self.conn)
-        tl = TestListener()
-        listenForIdEvents(tl, self.conn, 'receive', 'receive')
-        self.assertEqual(len(tl.received), 0)
-        obj = self._newObj()
-        self.assertEqual(len(tl.received), 1)
-        self.identity.remove(obj)
-        msg1, msg2 = tl.received        
-        self.failUnless(msg1.object is obj)
-        self.failUnless(isinstance(msg1, IdRegisteredMessage))
-        self.failUnless(msg2.object is obj)
-        self.failUnless(isinstance(msg2, IdUnregisteredMessage))
-        
-class TestIdentityServicePersistence(PersistenceTest):
-    
-    def testBasicPersistence(self):
-        from pypes.identity import IdentityService, pypesid
-        r1 = self.db.open().root()
-        ids = r1['t'] = IdentityService()
-        obj = TestClass()
-        obj.name = 'test'
-        objid = ids.register(obj)
-        get_transaction().commit()
-        
-        r2 = self.db.open().root()
-        ids2 = r2['t']
-        obj = ids2.getObject(objid)
-        self.assertEqual(ids._p_serial, ids._p_serial)
-        self.failUnless(isinstance(obj, TestClass))
-        self.assertEqual(obj.name, 'test')
-        self.assertEqual(pypesid(obj), objid)
-        
-    def testConcurrentRegistration(self):
-        from pypes.identity import IdentityService
-        r1 = self.db.open().root()
-        ids1 = r1['t'] = IdentityService()
-        get_transaction().commit()
-        
-        r2 = self.db.open().root()
-        ids2 = r2['t']
-        list(ids2); list(ids2.iterIds()) # Make sure its loaded
-        
-        ids1.register(TestClass())
-        get_transaction().commit()
-        
-        ids2.register(TestClass())
-        get_transaction().commit()
-        
-class TestIdentitySet(PypesTestCase):
-    
-    def setUp(self):
-        from pypes.identity import IdentitySet
-        PypesTestCase.setUp(self)
-        self.set = IdentitySet()
-    
-    def testInterfaces(self):
-        from pypes.interfaces import IIdentitySet
-        self.failUnless(verifyObject(IIdentitySet, self.set))
-        
-    def testFromUniqueSequence(self):
-        from pypes.identity import IdentitySet
-        objs = [self._newObj() for i in xrange(100)]
-        set = IdentitySet(objs)
-        for o in objs:
-            self.failUnless(o in set)
-            
-    def testFromNonUniqueSequence(self):
-        from pypes.identity import IdentitySet
-        objs = [self._newObj() for i in xrange(10)]
-        objs.extend(objs)
-        del objs[7:13]
-        set = IdentitySet(objs)
-        seen = {}
-        for o in objs:
-            self.failIf(o not in set)
-            seen[o] = None
-        seen = seen.keys()
-        seen.sort()
-        set = list(set)
-        set.sort()
-        self.assertEqual(seen, set)
-    
-    def testAddUnique(self):
-        objs = []
-        for i in xrange(100):
-            o = self._newObj()
-            self.failUnless(self.set.add(o))
-            objs.append(o)
-        for o in objs:
-            self.failUnless(o in self.set)
-        self.assertEqual(len(objs), len(self.set))
-    
-    def testAddDupes(self):
-        for i in xrange(100):
-            o = self._newObj()
-            self.failUnless(self.set.add(o))
-            self.failIf(self.set.add(o))
-            self.failUnless(o in self.set)
-        self.assertEqual(len(self.set), 100)
-    
-    def testInsertUnregistered(self):
-        from pypes.exceptions import IdentityError
-        self.assertRaises(IdentityError, self.set.add, TestClass())
-        
-    def testRemove(self):
-        objs = []
-        for i in xrange(100):
-            o = self._newObj()
-            self.set.add(o)
-            objs.append(o)
-        while objs:
-            o = objs.pop()
-            self.set.remove(o)
-            self.failIf(o in self.set)
-            self.assertEqual(len(objs), len(self.set))
-
-    def testRemoveNonMember(self):
-        from pypes.exceptions import SetLookupError
-        self.assertRaises(SetLookupError, self.set.remove, self._newObj())
-        
-    def testUpdate(self):
-        objs = [self._newObj() for i in xrange(100)]
-        self.set.update(objs)
-        for o in objs:
-            self.failUnless(o in self.set)
-        objs2 = [self._newObj() for i in xrange(100)]
-        self.set.update(objs2)
-        for o in objs + objs2:
-            self.failUnless(o in self.set)
-    
-    def testUpdateDupes(self):
-        objs = [self._newObj() for i in xrange(100)]
-        self.set.update(objs)
-        objs += [self._newObj() for i in xrange(100)]
-        objs.sort()
-        self.set.update(objs)
-        #self.assertEqual(len(self.set), 20)
-    
-    def testUpdateUnregistered(self):
-        from pypes.exceptions import IdentityError
-        objs = [self._newObj() for i in xrange(10)]
-        objs += [TestClass() for i in xrange(10)]
-        self.assertRaises(IdentityError, self.set.update, objs)
-    
-    def testIter(self):
-        objs = {}
-        for i in xrange(100):
-            o = self._newObj()
-            objs[o] = None
-        self.set.update(objs.keys())
-        for o in self.set:
-            del objs[o]
-        self.failIf(objs)
-    
-    def testIterUnregisteredYieldsNone(self):
-        from pypes.services import identity
-        o = self._newObj()
-        self.set.add(o)
-        identity(self.conn).remove(o)
-        self.failUnless(iter(self.set).next() is None)
-    
-    def testUnion(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        all = objs1 + objs2
-        objs2 += objs1[:3]
-        objs1 += objs2[:3]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        union = set1.union(set2)
-        self.failIf(union is set1)
-        self.failIf(union is set2)
-        self.failUnless(len(union), 20)
-        for o in all:
-            self.failUnless(o in union)
-
-    def testOrUnion(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        expected = objs1 + objs2
-        objs2 += objs1[:3]
-        objs1 += objs2[:3]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        union = set1 | set2
-        self.failIf(union is set1)
-        self.failIf(union is set2)
-        self.failUnless(len(union), len(expected))
-        for o in expected:
-            self.failUnless(o in union)   
-            
-    def testDifference(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        objs2 += objs1[-3:]
-        expected = objs1[:-3]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        diff = set1.difference(set2)
-        self.failIf(diff is set1)
-        self.failIf(diff is set2)
-        self.failUnless(len(diff), len(expected))
-        for o in expected:
-            self.failUnless(o in diff)
-    
-    def testMinusDifference(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        objs2 += objs1[-3:]
-        expected = objs1[:-3]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        diff = set1 - set2
-        self.failIf(diff is set1)
-        self.failIf(diff is set2)
-        self.failUnless(len(diff), len(expected))
-        for o in expected:
-            self.failUnless(o in diff)
-            
-    def testDiffNoOverlap(self):
-        from pypes.identity import IdentitySet
-        set1 = IdentitySet([self._newObj() for i in xrange(10)])
-        set2 = IdentitySet([self._newObj() for i in xrange(10)])
-        self.failUnless(set1.difference(set2) == set1)
-            
-    def testIntersection(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        objs2 += objs1[-4:]
-        expected = objs1[-4:]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        sect = set1.intersection(set2)
-        self.failIf(sect is set1)
-        self.failIf(sect is set2)
-        self.failUnless(len(sect), len(expected))
-        for o in expected:
-            self.failUnless(o in sect)
-        
-    def testAndIntersection(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        objs2 += objs1[-4:]
-        expected = objs1[-4:]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        sect = set1 & set2
-        self.failIf(sect is set1)
-        self.failIf(sect is set2)
-        self.failUnless(len(sect), len(expected))
-        for o in expected:
-            self.failUnless(o in sect)
-
-    def testEmptyIntersection(self):
-        from pypes.identity import IdentitySet
-        set1 = IdentitySet([self._newObj() for i in xrange(10)])
-        set2 = IdentitySet([self._newObj() for i in xrange(10)])
-        self.failIf(set1.intersection(set2))
-                
-    def testIsSuperAndSubset(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = objs1[2:-2]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        self.failUnless(set2.issubset(set1))
-        self.failIf(set1.issubset(set2))
-        self.failUnless(set1.issuperset(set2))
-        self.failIf(set2.issuperset(set1))
-         
-    def testIsNotSuperAndSubset(self):
-        from pypes.identity import IdentitySet
-        objs1 = [self._newObj() for i in xrange(10)]
-        objs2 = [self._newObj() for i in xrange(10)]
-        objs2 += objs1[2:-2]
-        set1 = IdentitySet(objs1)
-        set2 = IdentitySet(objs2)
-        self.failIf(set2.issubset(set1))
-        self.failIf(set1.issubset(set2))
-        self.failIf(set1.issuperset(set2))    
-        self.failIf(set2.issuperset(set1))
-    
-    def testEquivilanceIsSuperAndSubset(self):
-        from pypes.identity import IdentitySet
-        objs = [self._newObj() for i in xrange(10)]
-        set1 = IdentitySet(objs)
-        set2 = IdentitySet(objs)
-        self.failUnless(set1.issubset(set2))
-        self.failUnless(set2.issubset(set1))
-        self.failUnless(set1.issuperset(set2))
-        self.failUnless(set2.issuperset(set1))
-        self.assertEqual(set1, set2)
-
-class TestIdentitySetPersistence(PypesPersistentTest):
-    
-    def testSetPersistence(self):
-        from pypes.identity import IdentitySet
-        self.root['objs'] = [self._newObj() for i in range(10)]
-        set = IdentitySet(self.root['objs'])
-        self.root['set'] = set
-        get_transaction().commit()
-        
-        r2 = self.db.open().root()
-        for obj in r2['objs']:
-            self.failUnless(obj in r2['set'])   
-        
-    def testConcurrentAdd(self):
-        from pypes.identity import IdentitySet
-        
-        set1 = self.root['set'] = IdentitySet()
-        get_transaction().commit()
-        
-        r2 = self.db.open().root()
-        set2 = r2['set']
-        list(set2) # Make sure its loaded
-        
-        set1.add(self._newObj())
-        get_transaction().commit()
-        
-        set2.add(self._newObj())
-        get_transaction().commit()
-        
-        set3 = self.db.open().root()['set']
-        self.assertEqual(len(set3), 2)  
-    
-    def testDuplicateAddFails(self):
-        from pypes.identity import IdentitySet
-        set1 = self.root['set'] = IdentitySet()
-        get_transaction().commit()
-        
-        r2 = self.db.open().root()
-        set2 = r2['set']
-        list(set2) # Make sure its loaded
-        
-        obj = self._newObj()
-        set1.add(obj)
-        get_transaction().commit()
-        
-        set2.add(obj)
-        self.assertRaises(ConflictError, get_transaction().commit)
-        
-        set3 = self.db.open().root()['set']
-        self.assertEqual(len(set3), 1)
-        
-    def testConcurrentUpdate(self):
-        from pypes.identity import IdentitySet
-        set1 = self.root['set'] = IdentitySet()
-        get_transaction().commit()
-        
-        r2 = self.db.open().root()
-        set2 = r2['set']
-        list(set2) # Make sure its loaded
-        
-        set1.update([self._newObj() for i in range(10)])
-        get_transaction().commit()
-        
-        set2.update([self._newObj() for i in range(10)])
-        get_transaction().commit()
-        
-        set3 = self.db.open().root()['set']
-        self.assertEqual(len(set3), 20)
-        
-if __name__ == '__main__':
-    unittest.main()
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Identity service tests
+
+$Id$"""
+
+import ZODB
+from ZODB.POSException import ConflictError
+import unittest
+from random import randint
+from zope.interface.verify import verifyObject
+from persistent import Persistent
+from BTrees.OOBTree import OOTreeSet, difference
+from types import TypeType
+from common \
+    import PypesTestCase, TestClass, PersistenceTest, PypesPersistentTest
+
+class TestIdentityService(unittest.TestCase):
+    
+    def setUp(self):
+        from pypes.identity import IdentityService
+        self.identity = IdentityService()
+
+    def testInterfaces(self):
+        from pypes.interfaces import IPersistentService, IIdentityService
+        self.failUnless(verifyObject(IPersistentService, self.identity))
+        self.failUnless(verifyObject(IIdentityService, self.identity))
+        
+    def testIdTypeReturnsType(self):
+        self.failUnless(isinstance(self.identity.idType(), TypeType))
+        
+    def testIdIsCorrectType(self):
+        ident = self.identity.register(TestClass())
+        self.failUnless(isinstance(ident, self.identity.idType()))
+
+    def testIdIsTrue(self):
+        self.failUnless(self.identity.register(TestClass()))
+        
+    def testIdsUniqueAndOrderable(self):
+        ids = OOTreeSet()
+        register = self.identity.register
+        for i in xrange(1000):
+            self.failUnless(ids.insert(register(TestClass())))
+        self.assertEqual(len(ids), 1000)
+    
+    def testRegisterFailure(self):
+        from pypes.exceptions import IdentityError
+        self.assertRaises(
+            IdentityError, self.identity.register, None)
+            
+    def testDuplicateRegistrationFails(self):
+        from pypes.exceptions import IdentityError
+        o = TestClass()
+        self.identity.register(o)
+        self.assertRaises(
+            IdentityError, self.identity.register, o)
+        
+    def testHasId(self):
+        ids = []
+        register = self.identity.register
+        for i in xrange(1000):
+            ids.append(register(TestClass()))
+        ids.sort()
+        hasId = self.identity.hasId
+        for i in ids:
+            self.failUnless(hasId(i))
+            
+    def testNotHasId(self):
+        from pypes.identity import IdentityService
+        altidentity = IdentityService()
+        register = self.identity.register
+        ids = OOTreeSet()
+        for i in xrange(500):
+            ids.insert(register(TestClass()))
+        hasId = self.identity.hasId
+        for i in xrange(500):
+            ident = altidentity.register(TestClass())
+            self.failUnless(
+                bool(hasId(ident)) == bool(ids.has_key(ident)))
+        self.failIf(hasId(0))
+    
+    def testHasIdWrongType(self):
+        self.assertRaises(TypeError, self.identity.hasId, None)
+        self.assertRaises(TypeError, self.identity.hasId, TestClass)
+        self.assertRaises(TypeError, self.identity.hasId, TestClass())
+            
+    def testContains(self):
+        objs = []
+        register = self.identity.register
+        for i in xrange(1000):
+            o = TestClass()
+            objs.append(o)
+            register(o)
+        while objs:
+            o = objs.pop(randint(0, len(objs) - 1))
+            self.failUnless(o in self.identity)
+    
+    def testNotContains(self):
+        register = self.identity.register
+        for i in xrange(333):
+            self.failIf(TestClass() in self.identity)
+        for i in xrange(333):
+            register(TestClass())
+        for i in xrange(333):
+            self.failIf(TestClass() in self.identity)
+        
+    def testRemoveId(self):
+        from pypes.exceptions import IdentityKeyError
+        from pypes.identity import pypesid
+        o = TestClass()
+        self.failIf(o in self.identity)
+        ident = self.identity.register(o)
+        self.failUnless(o in self.identity)
+        self.identity.removeId(ident)
+        self.failIf(o in self.identity)
+        self.assertRaises(IdentityKeyError, self.identity.removeId, ident)
+        self.failUnless(pypesid(o) is None)
+        
+    def testRemoveObject(self):
+        from pypes.exceptions import IdentityError
+        from pypes.identity import pypesid
+        o = TestClass()
+        self.failIf(o in self.identity)
+        self.identity.register(o)
+        self.failUnless(o in self.identity)
+        self.identity.remove(o)
+        self.failIf(o in self.identity)
+        self.assertRaises(IdentityError, self.identity.remove, o)
+        self.failUnless(pypesid(o) is None)
+    
+    def testGetObject(self):
+        ids = []
+        objs = {}
+        register = self.identity.register
+        for i in xrange(1000):
+            o = TestClass()
+            ident = register(o)
+            ids.append(ident)
+            objs[ident] = o
+            o.id = ident
+        ids.sort()
+        getObject = self.identity.getObject
+        for ident in ids:
+            self.assertEqual(getObject(ident).id, ident)
+            self.failUnless(getObject(ident) is objs[ident])
+    
+    def testGetObjectFails(self):
+        from pypes.identity import IdentityService
+        from pypes.exceptions import IdentityKeyError
+        altidentity = IdentityService()
+        ident = self.identity.register(TestClass())
+        badident = altidentity.register(TestClass())
+        while ident == badident:
+            badident = altidentity.register(TestClass())
+        self.assertRaises(IdentityKeyError, self.identity.getObject, badident)
+    
+    def testLen(self):
+        self.failIf(self.identity)
+        ident = self.identity.register(TestClass())
+        self.assertEqual(len(self.identity), 1)
+        self.identity.removeId(ident)
+        self.assertEqual(len(self.identity), 0)
+        for i in xrange(100):
+            self.identity.register(TestClass())
+            self.assertEqual(len(self.identity), i + 1)
+
+    def testIterate(self):
+        objs = {}
+        register = self.identity.register
+        for i in xrange(1000):
+            o = TestClass()
+            objs[o] = register(o)
+        for o in self.identity:
+            del objs[o]
+        self.failIf(objs)
+        
+    def testIterateIds(self):
+        ids = {}
+        register = self.identity.register
+        for i in xrange(1000):
+            ids[register(TestClass())] = None
+        for i in self.identity.iterIds():
+            del ids[i]
+        self.failIf(ids)
+        
+    def testIdSet(self):
+        self.failIf(self.identity.idSet())
+        ids = OOTreeSet()
+        register = self.identity.register
+        for i in xrange(100):
+            ids.insert(register(TestClass()))
+        self.failIf(difference(OOTreeSet(self.identity.idSet()), ids))
+        
+class TestListener(object):
+    
+    def __init__(self):
+        self.received = []
+    
+    def receive(self, msg):
+        self.received.append(msg)
+        
+class TestIdentityServiceEvents(PypesTestCase):
+    
+    def testIdRegisteredEvent(self):
+        from pypes import services
+        from pypes.identity import pypesid, IdRegisteredMessage
+        event = services.event(self.conn)
+        tl = TestListener()
+        event.registerListener(tl, 'receive', IdRegisteredMessage)
+        o = self._newObj()
+        msg, = tl.received
+        self.failUnless(msg.object is o)
+        self.failUnless(msg.id is pypesid(o))        
+
+    def testIdRegisteredEventMultiple(self):
+        from pypes import services
+        from pypes.identity import IdRegisteredMessage
+        event = services.event(self.conn)
+        tl = TestListener()
+        event.registerListener(tl, 'receive', IdRegisteredMessage)
+        objs = [self._newObj() for i in range(10)]
+        self.assertEqual(len(tl.received), 10)
+        for o in objs:
+            self.identity.remove(o)
+        self.assertEqual(len(tl.received), 10)
+    
+    def testIdUnregisteredEvent(self):
+        from pypes import services
+        from pypes.identity import pypesid, IdUnregisteredMessage
+        event = services.event(self.conn)
+        tl = TestListener()
+        event.registerListener(tl, 'receive', IdUnregisteredMessage)
+        o = self._newObj()
+        id = pypesid(o)
+        self.assertEqual(len(tl.received), 0)
+        self.identity.remove(o)
+        msg, = tl.received
+        self.failUnless(msg.object is o)
+        self.failUnless(msg.id is id) 
+
+    def testIdUnregisteredEventMultiple(self):
+        from pypes import services
+        from pypes.identity import IdUnregisteredMessage
+        event = services.event(self.conn)
+        tl = TestListener()
+        event.registerListener(tl, 'receive', IdUnregisteredMessage)
+        objs = [self._newObj() for i in range(10)]
+        self.assertEqual(len(tl.received), 0)
+        for o in objs:
+            self.identity.remove(o)
+        self.assertEqual(len(tl.received), 10)
+    
+    def testListenForIdEventsFunc(self):
+        from pypes import services
+        from pypes.identity import listenForIdEvents
+        from pypes.identity import IdRegisteredMessage, IdUnregisteredMessage
+        event = services.event(self.conn)
+        tl = TestListener()
+        listenForIdEvents(tl, self.conn, 'receive', 'receive')
+        self.assertEqual(len(tl.received), 0)
+        obj = self._newObj()
+        self.assertEqual(len(tl.received), 1)
+        self.identity.remove(obj)
+        msg1, msg2 = tl.received        
+        self.failUnless(msg1.object is obj)
+        self.failUnless(isinstance(msg1, IdRegisteredMessage))
+        self.failUnless(msg2.object is obj)
+        self.failUnless(isinstance(msg2, IdUnregisteredMessage))
+        
+class TestIdentityServicePersistence(PersistenceTest):
+    
+    def testBasicPersistence(self):
+        from pypes.identity import IdentityService, pypesid
+        r1 = self.db.open().root()
+        ids = r1['t'] = IdentityService()
+        obj = TestClass()
+        obj.name = 'test'
+        objid = ids.register(obj)
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        ids2 = r2['t']
+        obj = ids2.getObject(objid)
+        self.assertEqual(ids._p_serial, ids._p_serial)
+        self.failUnless(isinstance(obj, TestClass))
+        self.assertEqual(obj.name, 'test')
+        self.assertEqual(pypesid(obj), objid)
+        
+    def testConcurrentRegistration(self):
+        from pypes.identity import IdentityService
+        r1 = self.db.open().root()
+        ids1 = r1['t'] = IdentityService()
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        ids2 = r2['t']
+        list(ids2); list(ids2.iterIds()) # Make sure its loaded
+        
+        ids1.register(TestClass())
+        get_transaction().commit()
+        
+        ids2.register(TestClass())
+        get_transaction().commit()
+        
+class TestIdentitySet(PypesTestCase):
+    
+    def setUp(self):
+        from pypes.identity import IdentitySet
+        PypesTestCase.setUp(self)
+        self.set = IdentitySet()
+    
+    def testInterfaces(self):
+        from pypes.interfaces import IIdentitySet
+        self.failUnless(verifyObject(IIdentitySet, self.set))
+        
+    def testFromUniqueSequence(self):
+        from pypes.identity import IdentitySet
+        objs = [self._newObj() for i in xrange(100)]
+        set = IdentitySet(objs)
+        for o in objs:
+            self.failUnless(o in set)
+            
+    def testFromNonUniqueSequence(self):
+        from pypes.identity import IdentitySet
+        objs = [self._newObj() for i in xrange(10)]
+        objs.extend(objs)
+        del objs[7:13]
+        set = IdentitySet(objs)
+        seen = {}
+        for o in objs:
+            self.failIf(o not in set)
+            seen[o] = None
+        seen = seen.keys()
+        seen.sort()
+        set = list(set)
+        set.sort()
+        self.assertEqual(seen, set)
+    
+    def testAddUnique(self):
+        objs = []
+        for i in xrange(100):
+            o = self._newObj()
+            self.failUnless(self.set.add(o))
+            objs.append(o)
+        for o in objs:
+            self.failUnless(o in self.set)
+        self.assertEqual(len(objs), len(self.set))
+    
+    def testAddDupes(self):
+        for i in xrange(100):
+            o = self._newObj()
+            self.failUnless(self.set.add(o))
+            self.failIf(self.set.add(o))
+            self.failUnless(o in self.set)
+        self.assertEqual(len(self.set), 100)
+    
+    def testInsertUnregistered(self):
+        from pypes.exceptions import IdentityError
+        self.assertRaises(IdentityError, self.set.add, TestClass())
+        
+    def testRemove(self):
+        objs = []
+        for i in xrange(100):
+            o = self._newObj()
+            self.set.add(o)
+            objs.append(o)
+        while objs:
+            o = objs.pop()
+            self.set.remove(o)
+            self.failIf(o in self.set)
+            self.assertEqual(len(objs), len(self.set))
+
+    def testRemoveNonMember(self):
+        from pypes.exceptions import SetLookupError
+        self.assertRaises(SetLookupError, self.set.remove, self._newObj())
+        
+    def testUpdate(self):
+        objs = [self._newObj() for i in xrange(100)]
+        self.set.update(objs)
+        for o in objs:
+            self.failUnless(o in self.set)
+        objs2 = [self._newObj() for i in xrange(100)]
+        self.set.update(objs2)
+        for o in objs + objs2:
+            self.failUnless(o in self.set)
+    
+    def testUpdateDupes(self):
+        objs = [self._newObj() for i in xrange(100)]
+        self.set.update(objs)
+        objs += [self._newObj() for i in xrange(100)]
+        objs.sort()
+        self.set.update(objs)
+        #self.assertEqual(len(self.set), 20)
+    
+    def testUpdateUnregistered(self):
+        from pypes.exceptions import IdentityError
+        objs = [self._newObj() for i in xrange(10)]
+        objs += [TestClass() for i in xrange(10)]
+        self.assertRaises(IdentityError, self.set.update, objs)
+    
+    def testIter(self):
+        objs = {}
+        for i in xrange(100):
+            o = self._newObj()
+            objs[o] = None
+        self.set.update(objs.keys())
+        for o in self.set:
+            del objs[o]
+        self.failIf(objs)
+    
+    def testIterUnregisteredYieldsNone(self):
+        from pypes.services import identity
+        o = self._newObj()
+        self.set.add(o)
+        identity(self.conn).remove(o)
+        self.failUnless(iter(self.set).next() is None)
+    
+    def testUnion(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        all = objs1 + objs2
+        objs2 += objs1[:3]
+        objs1 += objs2[:3]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        union = set1.union(set2)
+        self.failIf(union is set1)
+        self.failIf(union is set2)
+        self.failUnless(len(union), 20)
+        for o in all:
+            self.failUnless(o in union)
+
+    def testOrUnion(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        expected = objs1 + objs2
+        objs2 += objs1[:3]
+        objs1 += objs2[:3]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        union = set1 | set2
+        self.failIf(union is set1)
+        self.failIf(union is set2)
+        self.failUnless(len(union), len(expected))
+        for o in expected:
+            self.failUnless(o in union)   
+            
+    def testDifference(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        objs2 += objs1[-3:]
+        expected = objs1[:-3]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        diff = set1.difference(set2)
+        self.failIf(diff is set1)
+        self.failIf(diff is set2)
+        self.failUnless(len(diff), len(expected))
+        for o in expected:
+            self.failUnless(o in diff)
+    
+    def testMinusDifference(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        objs2 += objs1[-3:]
+        expected = objs1[:-3]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        diff = set1 - set2
+        self.failIf(diff is set1)
+        self.failIf(diff is set2)
+        self.failUnless(len(diff), len(expected))
+        for o in expected:
+            self.failUnless(o in diff)
+            
+    def testDiffNoOverlap(self):
+        from pypes.identity import IdentitySet
+        set1 = IdentitySet([self._newObj() for i in xrange(10)])
+        set2 = IdentitySet([self._newObj() for i in xrange(10)])
+        self.failUnless(set1.difference(set2) == set1)
+            
+    def testIntersection(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        objs2 += objs1[-4:]
+        expected = objs1[-4:]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        sect = set1.intersection(set2)
+        self.failIf(sect is set1)
+        self.failIf(sect is set2)
+        self.failUnless(len(sect), len(expected))
+        for o in expected:
+            self.failUnless(o in sect)
+        
+    def testAndIntersection(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        objs2 += objs1[-4:]
+        expected = objs1[-4:]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        sect = set1 & set2
+        self.failIf(sect is set1)
+        self.failIf(sect is set2)
+        self.failUnless(len(sect), len(expected))
+        for o in expected:
+            self.failUnless(o in sect)
+
+    def testEmptyIntersection(self):
+        from pypes.identity import IdentitySet
+        set1 = IdentitySet([self._newObj() for i in xrange(10)])
+        set2 = IdentitySet([self._newObj() for i in xrange(10)])
+        self.failIf(set1.intersection(set2))
+                
+    def testIsSuperAndSubset(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = objs1[2:-2]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        self.failUnless(set2.issubset(set1))
+        self.failIf(set1.issubset(set2))
+        self.failUnless(set1.issuperset(set2))
+        self.failIf(set2.issuperset(set1))
+         
+    def testIsNotSuperAndSubset(self):
+        from pypes.identity import IdentitySet
+        objs1 = [self._newObj() for i in xrange(10)]
+        objs2 = [self._newObj() for i in xrange(10)]
+        objs2 += objs1[2:-2]
+        set1 = IdentitySet(objs1)
+        set2 = IdentitySet(objs2)
+        self.failIf(set2.issubset(set1))
+        self.failIf(set1.issubset(set2))
+        self.failIf(set1.issuperset(set2))    
+        self.failIf(set2.issuperset(set1))
+    
+    def testEquivilanceIsSuperAndSubset(self):
+        from pypes.identity import IdentitySet
+        objs = [self._newObj() for i in xrange(10)]
+        set1 = IdentitySet(objs)
+        set2 = IdentitySet(objs)
+        self.failUnless(set1.issubset(set2))
+        self.failUnless(set2.issubset(set1))
+        self.failUnless(set1.issuperset(set2))
+        self.failUnless(set2.issuperset(set1))
+        self.assertEqual(set1, set2)
+
+class TestIdentitySetPersistence(PypesPersistentTest):
+    
+    def testSetPersistence(self):
+        from pypes.identity import IdentitySet
+        self.root['objs'] = [self._newObj() for i in range(10)]
+        set = IdentitySet(self.root['objs'])
+        self.root['set'] = set
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        for obj in r2['objs']:
+            self.failUnless(obj in r2['set'])   
+        
+    def testConcurrentAdd(self):
+        from pypes.identity import IdentitySet
+        
+        set1 = self.root['set'] = IdentitySet()
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        set2 = r2['set']
+        list(set2) # Make sure its loaded
+        
+        set1.add(self._newObj())
+        get_transaction().commit()
+        
+        set2.add(self._newObj())
+        get_transaction().commit()
+        
+        set3 = self.db.open().root()['set']
+        self.assertEqual(len(set3), 2)  
+    
+    def testDuplicateAddFails(self):
+        from pypes.identity import IdentitySet
+        set1 = self.root['set'] = IdentitySet()
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        set2 = r2['set']
+        list(set2) # Make sure its loaded
+        
+        obj = self._newObj()
+        set1.add(obj)
+        get_transaction().commit()
+        
+        set2.add(obj)
+        self.assertRaises(ConflictError, get_transaction().commit)
+        
+        set3 = self.db.open().root()['set']
+        self.assertEqual(len(set3), 1)
+        
+    def testConcurrentUpdate(self):
+        from pypes.identity import IdentitySet
+        set1 = self.root['set'] = IdentitySet()
+        get_transaction().commit()
+        
+        r2 = self.db.open().root()
+        set2 = r2['set']
+        list(set2) # Make sure its loaded
+        
+        set1.update([self._newObj() for i in range(10)])
+        get_transaction().commit()
+        
+        set2.update([self._newObj() for i in range(10)])
+        get_transaction().commit()
+        
+        set3 = self.db.open().root()['set']
+        self.assertEqual(len(set3), 20)
+        
+if __name__ == '__main__':
+    unittest.main()




More information about the Zope-CVS mailing list