[Zope-CVS] CVS: Packages/pypes/pypes/tests - common.py:1.8 test_query.py:1.10

Casey Duncan casey at zope.com
Wed May 12 01:10:38 EDT 2004


Update of /cvs-repository/Packages/pypes/pypes/tests
In directory cvs.zope.org:/tmp/cvs-serv1583/tests

Modified Files:
	common.py test_query.py 
Log Message:
Implement "in" join primitive
Basic iterResult() method implementation for Join


=== Packages/pypes/pypes/tests/common.py 1.7 => 1.8 ===
--- Packages/pypes/pypes/tests/common.py:1.7	Wed Mar 10 16:18:03 2004
+++ Packages/pypes/pypes/tests/common.py	Wed May 12 01:10:36 2004
@@ -97,8 +97,8 @@
         self.identity = pypes.services.identity(self.conn)
         get_transaction().commit()
     
-def sort(lst):
-    """Return a sorted copy of a list"""
-    l = lst[:]
+def sort(iterable):
+    """Return a sorted list from iterable"""
+    l = list(iterable)
     l.sort()
     return l


=== Packages/pypes/pypes/tests/test_query.py 1.9 => 1.10 ===
--- Packages/pypes/pypes/tests/test_query.py:1.9	Wed May  5 00:06:36 2004
+++ Packages/pypes/pypes/tests/test_query.py	Wed May 12 01:10:36 2004
@@ -333,12 +333,12 @@
         for r, l in join:
             self.assertEqual(l.big, r.big)
 
-    def test_greater_join(self):
-        from pypes.query import greater_join
+    def test_greaterjoin(self):
+        from pypes.query import greaterjoin
         us = [TestClass(age=14-i) for i in range(10)]
         them = [TestClass(age=i) for i in range(15)]
         expr = lambda x: x.age
-        joined = list(greater_join(us, expr, them, expr))
+        joined = list(greaterjoin(us, expr, them, expr))
         left2right = {}
         for left, right in joined:
             self.failUnless(left.age > right.age)
@@ -346,40 +346,73 @@
         for left, lesser in left2right.items():
             self.assertEqual(len(lesser), left.age)
     
-    def test_greater_join_empty(self):
-        from pypes.query import greater_join
-        joined = greater_join([], lambda x:x, [], lambda x:x)
+    def test_greaterjoin_empty(self):
+        from pypes.query import greaterjoin
+        joined = greaterjoin([], lambda x:x, [], lambda x:x)
         self.failIf(list(joined))
         objs = [TestClass() for i in range(5)]
-        joined = greater_join(objs, lambda x:x, [], lambda x:x)
+        joined = greaterjoin(objs, lambda x:x, [], lambda x:x)
         self.failIf(list(joined))
-        joined = greater_join([], lambda x:x, objs, lambda x:x)
+        joined = greaterjoin([], lambda x:x, objs, lambda x:x)
         self.failIf(list(joined))
     
-    def test_greater_join_none_greater(self):
-        from pypes.query import greater_join
+    def test_greaterjoin_none_greater(self):
+        from pypes.query import greaterjoin
         objs = [TestClass() for i in range(5)]
-        joined = greater_join(objs, lambda x: 1, objs, lambda x: 2)
+        joined = greaterjoin(objs, lambda x: 1, objs, lambda x: 2)
         self.failIf(list(joined))
 
+    def test_injoin(self):
+        from pypes.query import injoin
+        letters = ['a', 'b', 'c', 'd']
+        words = ['a', 'big', 'fat', 'hairy', 'deal']
+        expr = lambda x: x
+        joined = list(injoin(letters, expr, words, expr))
+        self.assertEqual(len(joined), 6)
+        for left, right in joined:
+            self.failUnless(left in right)
+            
+    def test_injoin_empty(self):
+        from pypes.query import injoin
+        joined = injoin([], lambda x:x, [], lambda x:x)
+        self.failIf(list(joined))
+        objs = ['' for i in range(5)]
+        joined = injoin(objs, lambda x:x, [], lambda x:x)
+        self.failIf(list(joined))
+        joined = injoin([], lambda x:x, objs, lambda x:x)
+        self.failIf(list(joined))
+    
+    def test_injoin_none_match(self):
+        from pypes.query import injoin
+        objs = [TestClass() for i in range(5)]
+        joined = injoin(objs, lambda x: 1, objs, lambda x: [])
+        self.failIf(list(joined))
+
+
 class TestJoin(unittest.TestCase):
     
+    def setUp(self):
+        from pypes.query import Join
+        self.inputs = {'a': Set(('AAA', 'BAA', 'AAC', 'AAD')),
+                       'b': Set(('A', 'B', 'D')),
+                       'c': Set (('AA', 'BB', 'CC'))}
+        self.criteria = DummyExpr(expr='a == b', names=['a', 'b'])
+        self.join = Join(self.inputs, self.criteria)
+    
     def test_interface(self):
         from pypes.query import Join
         from pypes.interfaces import IPartialResult
-        inputs = {'a':Set(), 'b':Set()}
         self.failUnless(verifyObject(
             IPartialResult, 
-            Join(inputs, DummyExpr(expr='a == b', names=['a', 'b']))))
+            Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))))
     
     def test_join_handles_compares(self):
         from pypes.query import Join
-        inputs = {'a':Set(), 'b':Set()}
-        Join(inputs, DummyExpr(expr='a == b', names=['a', 'b']))
-        Join(inputs, DummyExpr(expr='a < b', names=['a', 'b']))
-        Join(inputs, DummyExpr(expr='a > b', names=['a', 'b']))
-        Join(inputs, DummyExpr(expr='a in b', names=['a', 'b']))
-        Join(inputs, DummyExpr(
+        Join(self.inputs, DummyExpr(expr='a == b', names=['a', 'b']))
+        Join(self.inputs, DummyExpr(expr='a < b', names=['a', 'b']))
+        Join(self.inputs, DummyExpr(expr='a > b', names=['a', 'b']))
+        Join(self.inputs, DummyExpr(expr='a in b', names=['a', 'b']))
+        Join(self.inputs, DummyExpr(
             expr='a.__name__ == b["name"]', names=['a', 'b']))
 
     def test_join_cant_process(self):
@@ -414,6 +447,56 @@
         self.assertRaises(
             CantProcess, Join, inputs, 
             DummyExpr(expr='a <= b', names=['a', 'b']))
+        
+    def test_input_map(self):
+        self.assertEqual(self.join.inputMap(), self.inputs)
+        self.failIf(self.join.inputMap() is self.inputs)
+    
+    def test_criteria(self):
+        self.failUnless(self.join.criteriaExpr() is self.criteria)
+        
+    def test_names_used(self):
+        self.assertEqual(self.join.namesUsed(), Set(['a', 'b']))
+        
+    def test_select_joined_inputs(self):
+        from pypes.query import Join
+        j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
+        expected = sort([(b, a) for a in self.inputs['a'] 
+                                for b in self.inputs['b']
+                                if a[-1] == b])
+        self.assertEqual(sort(j.iterResult('b', 'a')), expected)
+        j = Join(self.inputs, DummyExpr(expr='b*2 > c', names=['b', 'c']))
+        expected = sort([(b, c) for b in self.inputs['b'] 
+                                for c in self.inputs['c']
+                                if b*2 > c])
+        self.assertEqual(sort(j.iterResult('b', 'c')), expected)
+        j = Join(self.inputs, DummyExpr(expr='b in a', names=['a', 'b']))
+        expected = sort([(a, b) for a in self.inputs['a'] 
+                                for b in self.inputs['b']
+                                if b in a])
+        self.assertEqual(sort(j.iterResult('a', 'b')), expected)        
+    
+    def test_select_inputs_not_joined(self):
+        from pypes.query import Join
+        j = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
+        expected = sort([(a, c) for a in self.inputs['a'] 
+                                for b in self.inputs['b']
+                                for c in self.inputs['c']
+                                if a[-1] == b])
+        self.assertEqual(sort(j.iterResult('a', 'c')), expected)
+    
+    def test_union_joins_one_common_input(self):
+        from pypes.query import Join
+        j1 = Join(self.inputs, DummyExpr(expr='a[-1] == b', names=['a', 'b']))
+        j2 = Join(self.inputs, DummyExpr(expr='a[:2] == c', names=['a', 'b']))
+        union = j1.union(j2)
+        self.failIf(union is j1)
+        self.failIf(union is j2)
+        expected = sort([(a, b, c) for a in self.inputs['a'] 
+                                   for b in self.inputs['b']
+                                   for c in self.inputs['c']
+                                   if a[-1] == b or a[:2] == c])
+        self.assertEqual(sort(union), expected)
 
 if __name__ == '__main__':
     unittest.main()




More information about the Zope-CVS mailing list