[Zope-Checkins] SVN: Zope/trunk/lib/python/Testing/ZopeTestCase/ Made the ConnectionRegistry capable of storing OFS.Application objects

Stefan H. Holek stefan at epy.co.at
Wed May 11 12:03:26 EDT 2005


Log message for revision 30326:
  Made the ConnectionRegistry capable of storing OFS.Application objects
  and pushed reponsibility for closing the REQUEST down to the registry.
  

Changed:
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/PortalTestCase.py
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/ZopeTestCase.py
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/base.py
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/connections.py
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/sandbox.py
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/testBaseTestCase.py
  U   Zope/trunk/lib/python/Testing/ZopeTestCase/testPortalTestCase.py

-=-
Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/PortalTestCase.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/PortalTestCase.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/PortalTestCase.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -24,7 +24,7 @@
 by the PortalTestCase class! Subclasses must make sure
 getPortal() returns a usable portal object to the setup code.
 
-$Id: PortalTestCase.py,v 1.38 2005/02/09 12:42:40 shh42 Exp $
+$Id$
 """
 
 import base
@@ -111,7 +111,7 @@
 
            Note: This method should not be called by tests!
         '''
-        return self.app[portal_name]
+        return getattr(self.app, portal_name)
 
     def createMemberarea(self, name):
         '''Creates a memberarea for the specified user.

Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/ZopeTestCase.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/ZopeTestCase.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/ZopeTestCase.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -64,7 +64,7 @@
     def _setupFolder(self):
         '''Creates and configures the folder.'''
         self.app.manage_addFolder(folder_name)
-        self.folder = self.app._getOb(folder_name)
+        self.folder = getattr(self.app, folder_name)
         self.folder._addRole(user_role)
         self.folder.manage_role(user_role, standard_permissions)
 
@@ -81,7 +81,7 @@
         '''Clears the fixture.'''
         # This code is a wart from the olden days.
         try:
-            if connections.contains(self.app._p_jar):
+            if connections.contains(self.app):
                 self.app._delObject(folder_name)
         except:
             pass

Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/base.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/base.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/base.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -16,7 +16,6 @@
 """
 
 import ZopeLite as Zope2
-
 import unittest
 import transaction
 import profiler
@@ -27,16 +26,17 @@
 from AccessControl.SecurityManagement import noSecurityManager
 
 
+
 def app():
     '''Opens a ZODB connection and returns the app object.'''
     app = Zope2.app()
-    connections.register(app._p_jar)
-    return utils.makerequest(app)
+    app = utils.makerequest(app)
+    connections.register(app)
+    return app
 
 def close(app):
     '''Closes the app's ZODB connection.'''
-    app.REQUEST.close()
-    connections.close(app._p_jar)
+    connections.close(app)
 
 
 
@@ -119,8 +119,6 @@
         '''Clears the fixture.'''
         if call_close_hook:
             self.beforeClose()
-        if connections.contains(self.app._p_jar):
-            self.app.REQUEST.close()
         self._close()
         self.logout()
         self.afterClear()

Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/connections.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/connections.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/connections.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -16,8 +16,13 @@
 """
 
 class ConnectionRegistry:
-    '''ZODB connection registry'''
+    '''ZODB connection registry
 
+    This registry can hold either ZODB.Connection objects or OFS.Application
+    objects. In the latter case, a close operation will close the REQUEST as
+    well as the Connection referenced by the Application's _p_jar attribute.
+    '''
+
     def __init__(self):
         self._conns = []
 
@@ -36,14 +41,21 @@
     def close(self, conn):
         if self.contains(conn):
             self._conns.remove(conn)
-        conn.close()
+        self._do_close(conn)
 
     def closeAll(self):
         for conn in self._conns:
-            conn.close()
+            self._do_close(conn)
         self._conns = []
 
+    def _do_close(self, conn):
+        if hasattr(conn, 'close'):
+            conn.close()
+        else:
+            conn.REQUEST.close()
+            conn._p_jar.close()
 
+
 registry = ConnectionRegistry()
 register = registry.register
 contains = registry.contains

Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/sandbox.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/sandbox.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/sandbox.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -33,14 +33,15 @@
     def _app(self):
         '''Returns the app object for a test.'''
         app = Zope2.app(Zope2.sandbox().open())
-        connections.register(app._p_jar)
         AppZapper().set(app)
-        return utils.makerequest(app)
+        app = utils.makerequest(app)
+        connections.register(app)
+        return app
 
     def _close(self):
         '''Clears the transaction and the AppZapper.'''
+        AppZapper().clear()
         transaction.abort()
-        AppZapper().clear()
         connections.closeAll()
 
 

Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/testBaseTestCase.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/testBaseTestCase.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/testBaseTestCase.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -31,7 +31,9 @@
 from Testing.ZopeTestCase import base
 from Testing.ZopeTestCase import utils
 from Testing.ZopeTestCase import connections
+from Testing.ZopeTestCase import sandbox
 
+from Acquisition import aq_base
 from AccessControl import getSecurityManager
 from AccessControl.SecurityManagement import newSecurityManager
 
@@ -121,6 +123,36 @@
         self._clear()
         self.assertEqual(getSecurityManager().getUser().getUserName(), 'Anonymous User')
 
+    def testClearSurvivesDoubleCall(self):
+        self._called = []
+        self._clear()
+        self._clear()
+        self.assertHooks(['afterClear', 'afterClear'])
+
+    def testClearSurvivesClosedConnection(self):
+        self._called = []
+        self._close()
+        self._clear()
+        self.assertHooks(['afterClear'])
+
+    def testClearSurvivesBrokenApp(self):
+        self._called = []
+        self.app = None
+        self._clear()
+        self.assertHooks(['afterClear'])
+
+    def testClearSurvivesMissingApp(self):
+        self._called = []
+        delattr(self, 'app')
+        self._clear()
+        self.assertHooks(['afterClear'])
+
+    def testClearSurvivesMissingRequest(self):
+        self._called = []
+        self.app = aq_base(self.app)
+        self._clear()
+        self.assertHooks(['afterClear'])
+
     def testCloseAbortsTransaction(self):
         self.assertEqual(len(self.getObjectsInTransaction()), 0)
         self.app.foo = 1
@@ -160,7 +192,8 @@
 
 class TestSetUpRaises(HookTest):
 
-    class Error: pass
+    class Error:
+        pass
 
     def setUp(self):
         try:
@@ -180,7 +213,8 @@
 
 class TestTearDownRaises(HookTest):
 
-    class Error: pass
+    class Error:
+        pass
 
     def tearDown(self):
         self._called = []
@@ -200,90 +234,109 @@
 
 
 class TestConnectionRegistry(base.TestCase):
+    '''Test the registry with Connection-like objects'''
 
     class Conn:
-        closed = 0
+        _closed = 0
         def close(self):
-            self.closed = 1
+            self._closed = 1
+        def closed(self):
+            return self._closed
 
+    Klass = Conn
+
     def afterSetUp(self):
         self.reg = connections.ConnectionRegistry()
-        self.conns = [self.Conn(), self.Conn(), self.Conn()]
+        self.conns = [self.Klass(), self.Klass(), self.Klass()]
+        for conn in self.conns:
+            self.reg.register(conn)
 
     def testRegister(self):
         # Should be able to register connections
-        for conn in self.conns:
-            self.reg.register(conn)
         assert len(self.reg) == 3
         assert self.reg.count() == 3
 
     def testCloseConnection(self):
         # Should be able to close a single registered connection
-        for conn in self.conns:
-            self.reg.register(conn)
         assert len(self.reg) == 3
         self.reg.close(self.conns[0])
         assert len(self.reg) == 2
-        assert self.conns[0].closed == 1
-        assert self.conns[1].closed == 0
-        assert self.conns[2].closed == 0
+        assert self.conns[0].closed() == 1
+        assert self.conns[1].closed() == 0
+        assert self.conns[2].closed() == 0
 
     def testCloseSeveralConnections(self):
         # Should be able to close all registered connections one-by-one
-        for conn in self.conns:
-            self.reg.register(conn)
         assert len(self.reg) == 3
         self.reg.close(self.conns[0])
         assert len(self.reg) == 2
-        assert self.conns[0].closed == 1
-        assert self.conns[1].closed == 0
-        assert self.conns[2].closed == 0
+        assert self.conns[0].closed() == 1
+        assert self.conns[1].closed() == 0
+        assert self.conns[2].closed() == 0
         self.reg.close(self.conns[2])
         assert len(self.reg) == 1
-        assert self.conns[0].closed == 1
-        assert self.conns[1].closed == 0
-        assert self.conns[2].closed == 1
+        assert self.conns[0].closed() == 1
+        assert self.conns[1].closed() == 0
+        assert self.conns[2].closed() == 1
         self.reg.close(self.conns[1])
         assert len(self.reg) == 0
-        assert self.conns[0].closed == 1
-        assert self.conns[1].closed == 1
-        assert self.conns[2].closed == 1
+        assert self.conns[0].closed() == 1
+        assert self.conns[1].closed() == 1
+        assert self.conns[2].closed() == 1
 
     def testCloseForeignConnection(self):
         # Should be able to close a connection that has not been registered
-        for conn in self.conns:
-            self.reg.register(conn)
         assert len(self.reg) == 3
-        conn = self.Conn()
+        conn = self.Klass()
         self.reg.close(conn)
         assert len(self.reg) == 3
-        assert self.conns[0].closed == 0
-        assert self.conns[1].closed == 0
-        assert self.conns[2].closed == 0
-        assert conn.closed == 1
+        assert self.conns[0].closed() == 0
+        assert self.conns[1].closed() == 0
+        assert self.conns[2].closed() == 0
+        assert conn.closed() == 1
 
     def testCloseAllConnections(self):
         # Should be able to close all registered connections at once
-        for conn in self.conns:
-            self.reg.register(conn)
         assert len(self.reg) == 3
         self.reg.closeAll()
         assert len(self.reg) == 0
-        assert self.conns[0].closed == 1
-        assert self.conns[1].closed == 1
-        assert self.conns[2].closed == 1
+        assert self.conns[0].closed() == 1
+        assert self.conns[1].closed() == 1
+        assert self.conns[2].closed() == 1
 
     def testContains(self):
         # Should be able to check if a connection is registered
-        for conn in self.conns:
-            self.reg.register(conn)
         assert len(self.reg) == 3
         assert self.reg.contains(self.conns[0])
         assert self.reg.contains(self.conns[1])
         assert self.reg.contains(self.conns[2])
 
 
+class TestApplicationRegistry(TestConnectionRegistry):
+    '''Test the registry with Application-like objects'''
+
+    class App:
+        class Conn:
+            _closed = 0
+            def close(self):
+                self._closed = 1
+            def closed(self):
+                return self._closed
+
+        def __init__(self):
+            self.REQUEST = self.Conn()
+            self._p_jar = self.Conn()
+
+        def closed(self):
+            if self.REQUEST.closed() and self._p_jar.closed():
+                return 1
+            return 0
+
+    Klass = App
+
+
 class TestListConverter(base.TestCase):
+    '''Test utils.makelist'''
 
     def testList0(self):
         self.assertEqual(utils.makelist([]), [])
@@ -339,9 +392,12 @@
 
 import gc
 _sentinel1 = []
+_sentinel2 = []
+_sentinel3 = []
 
+
 class TestRequestGarbage1(base.TestCase):
-    '''Make sure we do not leak REQUEST._held (and REQUEST.other)'''
+    '''Make sure base.app + base.close does not leak REQUEST._held'''
 
     class Held:
         def __del__(self):
@@ -357,10 +413,8 @@
         self.assertEqual(_sentinel1, ['__del__'])
 
 
-_sentinel2 = []
-
 class TestRequestGarbage2(base.TestCase):
-    '''Make sure we do not leak REQUEST._held (and REQUEST.other)'''
+    '''Make sure self._app + self._clear does not leak REQUEST._held'''
 
     class Held:
         def __del__(self):
@@ -375,6 +429,22 @@
         self.assertEqual(_sentinel2, ['__del__'])
 
 
+class TestRequestGarbage3(sandbox.Sandboxed, base.TestCase):
+    '''Make sure self._app + self._clear does not leak REQUEST._held'''
+
+    class Held:
+        def __del__(self):
+            _sentinel3.append('__del__')
+
+    def afterSetUp(self):
+        self.app.REQUEST._hold(self.Held())
+
+    def testClearClosesRequest(self):
+        self._clear()
+        gc.collect()
+        self.assertEqual(_sentinel3, ['__del__'])
+
+
 def test_suite():
     from unittest import TestSuite, makeSuite
     suite = TestSuite()
@@ -382,10 +452,12 @@
     suite.addTest(makeSuite(TestSetUpRaises))
     suite.addTest(makeSuite(TestTearDownRaises))
     suite.addTest(makeSuite(TestConnectionRegistry))
+    suite.addTest(makeSuite(TestApplicationRegistry))
     suite.addTest(makeSuite(TestListConverter))
     suite.addTest(makeSuite(TestRequestVariables))
     suite.addTest(makeSuite(TestRequestGarbage1))
     suite.addTest(makeSuite(TestRequestGarbage2))
+    suite.addTest(makeSuite(TestRequestGarbage3))
     return suite
 
 if __name__ == '__main__':

Modified: Zope/trunk/lib/python/Testing/ZopeTestCase/testPortalTestCase.py
===================================================================
--- Zope/trunk/lib/python/Testing/ZopeTestCase/testPortalTestCase.py	2005-05-11 14:06:08 UTC (rev 30325)
+++ Zope/trunk/lib/python/Testing/ZopeTestCase/testPortalTestCase.py	2005-05-11 16:03:21 UTC (rev 30326)
@@ -42,7 +42,7 @@
     return hasattr(aq_base(ob), attr)
 
 
-# Dummy Portal
+# A dummy portal
 
 from OFS.SimpleItem import SimpleItem
 from OFS.Folder import Folder
@@ -66,7 +66,7 @@
         portal.Members.manage_addFolder(member_id)
     def getHomeFolder(self, member_id):
         portal = self.aq_inner.aq_parent
-        return portal.Members[member_id]
+        return getattr(portal.Members, member_id)
 
 
 class TestPortalTestCase(ZopeTestCase.PortalTestCase):
@@ -78,7 +78,7 @@
     def getPortal(self):
         # Must make sure we return a portal object
         self.app._setObject(portal_name, DummyPortal(portal_name))
-        return self.app[portal_name]
+        return getattr(self.app, portal_name)
 
     def setUp(self):
         # For this test case we *want* to start
@@ -391,7 +391,7 @@
 
     def getPortal(self):
         self.app._setObject(portal_name, DummyPortal(portal_name))
-        return self.app[portal_name]
+        return getattr(self.app, portal_name)
 
     def testGetUserDoesNotWrapUser(self):
         user = self.portal.acl_users.getUserById(user_name)
@@ -412,7 +412,7 @@
 
     def getPortal(self):
         self.app._setObject(portal_name, DummyPortal(portal_name))
-        return self.app[portal_name]
+        return getattr(self.app, portal_name)
 
     def _setupUserFolder(self):
         self.portal._setObject('acl_users', WrappingUserFolder())
@@ -458,12 +458,13 @@
 
 class TestSetUpRaises(HookTest):
 
+    class Error:
+        pass
+
     def getPortal(self):
         self.app._setObject(portal_name, DummyPortal(portal_name))
-        return self.app[portal_name]
+        return getattr(self.app, portal_name)
 
-    class Error: pass
-
     def setUp(self):
         try:
             HookTest.setUp(self)



More information about the Zope-Checkins mailing list