[Zope-CVS] CVS: Products/AdaptableStorage/tests - Zope2TestBase.py:1.4.2.1 testASStorage.py:1.4.2.1 testAll.py:1.3.2.1 testSerialization.py:1.7.2.1 testZope2FS.py:1.10.2.1

Christian Zagrodnick cz@gocept.com
Mon, 13 Jan 2003 14:16:25 -0500


Update of /cvs-repository/Products/AdaptableStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv19230

Modified Files:
      Tag: zagy-patches
	testASStorage.py testAll.py testSerialization.py 
	testZope2FS.py 
Added Files:
      Tag: zagy-patches
	Zope2TestBase.py 
Log Message:
merging HEAD into zagy-patches branch

=== Added File Products/AdaptableStorage/tests/Zope2TestBase.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of storing folders on the filesystem via ZODB

$Id: Zope2TestBase.py,v 1.4.2.1 2003/01/13 19:16:22 zagy Exp $
"""

from Acquisition import aq_base
from ZODB import Persistent, POSException
from Persistence import PersistentMapping
from OFS.Folder import Folder
from OFS.ObjectManager import ObjectManager
from OFS.SimpleItem import SimpleItem
from AccessControl.User import User, UserFolder

from Products.AdaptableStorage.patches import applySetObPatch


class TestFolder(Folder):

    meta_type = 'Zope2FS Test Folder'

    def __init__(self, title):
        self.title = title


class TestObjectManager(ObjectManager):

    meta_type = 'Zope2FS Test ObjectManager'

    def __init__(self, title):
        self.title = title


class TestFile(SimpleItem):

    meta_type = 'Zope2FS Test File'

    def __init__(self, content):
        self.content = content


class Zope2TestBase:

    def testLoad(self):
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            app.getId()
        finally:
            conn.close()

    def testStore(self):
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = Folder()
            f.id = 'Holidays'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            f2 = Folder()
            f2.id = 'Christmas'
            f._setObject(f2.id, f2, set_owner=0)
            get_transaction().commit()

            f3 = Folder()
            f3.id = 'Eve'
            f2._setObject(f3.id, f3, set_owner=0)
            get_transaction().commit()

            conn2 = self.db.open()
            try:
                app = conn2.root()['Application']
                self.assert_(hasattr(app, 'Holidays'))
                self.assert_(hasattr(app.Holidays, 'Christmas'))
                self.assert_(hasattr(app.Holidays.Christmas, 'Eve'))
            finally:
                conn2.close()

        finally:
            conn.close()

    def testAnyFolderishStorage(self):
        # Try to store a folderish object of an otherwise unknown class
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = Folder()
            f.id = 'Holidays'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            f2 = TestFolder("New Year's Eve")
            f2.id = 'NewYear'
            f._setObject(f2.id, f2, set_owner=0)
            get_transaction().commit()

            # Verify the object is in its own database record
            self.assertNotEqual(f2._p_oid, None)
            f2._p_changed = None
            self.assert_(f2._p_changed is None)

            # Verify the ability to load it
            conn2 = self.db.open()
            try:
                app2 = conn2.root()['Application']
                ff = app2.Holidays.NewYear
                self.assertEqual(ff.title, "New Year's Eve")
                self.assertEqual(ff.__class__, TestFolder)
            finally:
                conn2.close()
        finally:
            conn.close()


    def testAnyFolderWithoutPropertiesStorage(self):
        # Try to store a folderish object that does not implement
        # PropertyManager (tests OptionalAspect)
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = TestObjectManager("* Holiday Calendar *")
            f.id = 'Holidays'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            # Verify the ability to load it
            conn2 = self.db.open()
            try:
                app2 = conn2.root()['Application']
                ff = app2.Holidays
                self.assertEqual(ff.title, "* Holiday Calendar *")
                self.assertEqual(ff.__class__, TestObjectManager)
            finally:
                conn2.close()
        finally:
            conn.close()


    def testAnyFileishStorage(self):
        # Try to store a fileish object of an otherwise unknown class
        conn = self.db.open()
        try:
            content = 'insert wise expression here'

            app = conn.root()['Application']
            f = TestFile(content)
            f.id = 'testitem'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            # Verify the object is in its own database record
            self.assertNotEqual(f._p_oid, None)
            f._p_changed = None
            self.assert_(f._p_changed is None)

            # Verify the ability to load it
            conn2 = self.db.open()
            try:
                app2 = conn2.root()['Application']
                ff = app2.testitem
                self.assertEqual(ff.content, content)
                self.assertEqual(ff.__class__, TestFile)
            finally:
                conn2.close()
        finally:
            conn.close()


    def testStoreProperties(self):
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = Folder()
            f.id = 'Holidays'
            f.title = 'Holiday Calendar'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            f._setProperty('pi', 3.14, 'float')
            f._setProperty('stuff', ['a', 'bc', 'd'], 'lines')
            get_transaction().commit()

            conn2 = self.db.open()
            try:
                app = conn2.root()['Application']
                self.assert_(hasattr(app, 'Holidays'))
                got = 0
                for k, v in app.Holidays.propertyItems():
                    if k == 'title':
                        got += 1
                        self.assertEqual(v, 'Holiday Calendar')
                    elif k == 'pi':
                        got += 1
                        self.assertEqual(v, 3.14)
                    elif k == 'stuff':
                        got += 1
                        self.assertEqual(v, ['a', 'bc', 'd'])
                self.assertEqual(got, 3)
            finally:
                conn2.close()

        finally:
            conn.close()


    def testStoreUserFolder(self):
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = UserFolder()
            f.id = 'acl_users'
            app._setObject(f.id, f, set_owner=0)
            f._doAddUser('ned', 'abcdefg', ('Serf', 'Knight', 'King'), ())
            f._doAddUser('joe', '123', ('Geek',), ())
            get_transaction().commit()

            # Be sure ZODB sees the unmanaged persistent objects
            u = f.data['ned']
            self.assertEqual(f.data._p_oid, 'unmanaged')
            self.assertEqual(u._p_oid, 'unmanaged')

            # Make some changes
            u.roles = ('Knight', 'King')
            u.domains = ('localhost',)
            del f.data['joe']           # Test user deletion
            get_transaction().commit()

            conn2 = self.db.open()
            try:
                app = conn2.root()['Application']
                ff = app.acl_users
                self.assert_(aq_base(app.__allow_groups__) is aq_base(ff))
                self.assertEqual(len(ff.data), 1)
                user = ff.data['ned']
                self.assertEqual(user.name, 'ned')
                self.assertEqual(user.roles, ('Knight', 'King'))
                self.assertEqual(user.domains, ('localhost',))
                self.assert_(user is not u)
            finally:
                conn2.close()

        finally:
            conn.close()


    def testNewObjectConflictDetection(self):
        # Verify a new object won't overwrite existing objects by accident
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            app.some_attr = 'stuff'
            app._p_serial = '\0' * 8  # Pretend that it's new
            self.assertRaises(POSException.ConflictError,
                              get_transaction().commit)
        finally:
            conn.close()


    def testRename(self):
        applySetObPatch()  # Required for this test to work with Zope2FS

        conn = self.db.open()
        try:
            app = conn.root()['Application']
            f = Folder()
            f.id = 'Holidays'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            # Do what manage_rename does, without the security checks
            ob = app.Holidays.aq_base
            app._delObject('Holidays')
            ob._setId('HolidayCalendar')
            app._setObject(ob.id, ob, set_owner=0)
            get_transaction().commit()

            self.assert_(hasattr(app, 'HolidayCalendar'))
            self.assert_(not hasattr(app, 'Holidays'))

            conn2 = self.db.open()
            try:
                app = conn2.root()['Application']
                self.assert_(hasattr(app, 'HolidayCalendar'))
                self.assert_(not hasattr(app, 'Holidays'))
            finally:
                conn2.close()

        finally:
            conn.close()


=== Products/AdaptableStorage/tests/testASStorage.py 1.4 => 1.4.2.1 ===
--- Products/AdaptableStorage/tests/testASStorage.py:1.4	Fri Dec 13 15:42:03 2002
+++ Products/AdaptableStorage/tests/testASStorage.py	Mon Jan 13 14:16:22 2003
@@ -17,6 +17,7 @@
 """
 
 import unittest
+from thread import start_new_thread, allocate_lock
 
 import ZODB
 from Persistence import PersistentMapping
@@ -27,7 +28,21 @@
 from SerialTestBase import SerialTestBase
 
 
+def run_in_thread(f):
+    lock = allocate_lock()
+    def run(f=f, lock=lock):
+        try:
+            f()
+        finally:
+            lock.release()
+    lock.acquire()
+    start_new_thread(run, ())
+    lock.acquire()
+    lock.release()
+
+
 class ASStorageTests (SerialTestBase, unittest.TestCase):
+    # Tests of ASStorage, not including ASConnection.
 
     def setUp(self):
         SerialTestBase.setUp(self)
@@ -122,14 +137,16 @@
             self.assertEqual(ob1.items(), [('a', 'b')])
             self.assertEqual(ob1.stowaway.items(), [('c', 'd')])
 
-            # Verify a new object was stored and make a change only to
-            # the unmanaged persistent object (the "stowaway").
+            # Verify a new object was stored
             get_transaction().begin()
             conn2 = self.db.open()
             ob2 = conn2.loadStub(('test2',))
             self.assertEqual(ob2.items(), [('a', 'b')])
             self.assertEqual(ob2.stowaway.items(), [('c', 'd')])
-            ob2.stowaway['c'] = 'e'
+
+            # Make a change only to the unmanaged persistent object
+            # (the "stowaway").
+            ob.stowaway['c'] = 'e'
             get_transaction().commit()
 
             # Verify the change was stored and make a change to the
@@ -163,7 +180,81 @@
                 conn3.close()
 
 
+    def testStoreAndLoadBinary(self):
+        ob = PersistentMapping()
+        # strdata contains binary characters
+        ob.strdata = ''.join([chr(n) for n in range(256)]) * 2
+
+        dummy = PersistentMapping()
+
+        conn1 = self.db.open()
+        try:
+            root = conn1.root()
+            get_transaction().begin()
+            root['TestRoot'] = ob
+            root['TestRoot2'] = dummy
+            get_transaction().commit()
+            ob1 = conn1.loadStub(('test',))
+            self.assertEqual(ob1.strdata, ob.strdata)
+            self.assertEqual(ob1.items(), ob.items())
+        finally:
+            conn1.close()
+
+
+    def _changeTestRoot(self):
+        conn2 = self.db.open()
+        try:
+            ob2 = conn2.root()['TestRoot']
+            ob2.strdata = 'ghi'
+            get_transaction().commit()
+        finally:
+            conn2.close()
 
+
+    def testConflictDetection(self):
+        ob1 = PersistentMapping()
+        ob1.strdata = 'abc'
+
+        dummy = PersistentMapping()
+
+        conn1 = self.db.open()
+        try:
+            root = conn1.root()
+            get_transaction().begin()
+            root['TestRoot'] = ob1
+            root['TestRoot2'] = dummy
+            get_transaction().commit()
+            ob1.strdata = 'def'
+            run_in_thread(self._changeTestRoot)
+            # Verify that "def" doesn't get written, since it
+            # conflicts with "ghi".
+            self.assertRaises(ZODB.POSException.ConflictError,
+                              get_transaction().commit)
+            self.assertEqual(ob1.strdata, "ghi")
+        finally:
+            conn1.close()
+
+
+    def testNewObjectConflictDetection(self):
+        # Verify a new object won't overwrite existing objects by accident
+        ob1 = PersistentMapping()
+        ob1.strdata = 'abc'
+
+        dummy = PersistentMapping()
+
+        conn1 = self.db.open()
+        try:
+            root = conn1.root()
+            get_transaction().begin()
+            root['TestRoot'] = ob1
+            root['TestRoot2'] = dummy
+            get_transaction().commit()
+            ob1.strdata = 'def'
+            ob1._p_serial = '\0' * 8  # Pretend that it's new
+            self.assertRaises(ZODB.POSException.ConflictError,
+                              get_transaction().commit)
+        finally:
+            conn1.close()
 
 
 if __name__ == '__main__':


=== Products/AdaptableStorage/tests/testAll.py 1.3 => 1.3.2.1 ===
--- Products/AdaptableStorage/tests/testAll.py:1.3	Tue Dec 31 16:09:05 2002
+++ Products/AdaptableStorage/tests/testAll.py	Mon Jan 13 14:16:22 2003
@@ -16,7 +16,7 @@
 $Id$
 """
 
-import unittest
+import sys, unittest
 
 from testSerialization import SerializationTests
 from testInterfaceImpl import InterfaceImplTests
@@ -26,13 +26,21 @@
 try:
     import psycopg
 except ImportError:
-    pass
+    sys.stderr.write('Warning: could not import psycopg.\n')
+    sys.stderr.write('Not running the PostgreSQL tests.\n')
 else:
-    # Run the Postgres tests.
-    from Products.AdaptableStorage.gateway_sql.tests.testZope2SQL \
-         import Zope2SQLTests
-    from Products.AdaptableStorage.gateway_sql.tests.testInterfaceImpl \
-         import InterfaceImplTests as InterfaceImplTests2
+    try:
+        c = psycopg.connect('')
+        c.close()
+    except psycopg.DatabaseError:
+        sys.stderr.write('Warning: could not open a psycopg connection.\n')
+        sys.stderr.write('Not running the PostgreSQL tests.\n')
+    else:
+        # Run the PostgreSQL tests.
+        from Products.AdaptableStorage.gateway_sql.tests.testZope2SQL \
+             import Zope2SQLTests
+        from Products.AdaptableStorage.gateway_sql.tests.testInterfaceImpl \
+             import InterfaceImplTests as InterfaceImplTests2
 
 
 if __name__ == '__main__':


=== Products/AdaptableStorage/tests/testSerialization.py 1.7 => 1.7.2.1 ===
--- Products/AdaptableStorage/tests/testSerialization.py:1.7	Tue Dec 31 16:47:52 2002
+++ Products/AdaptableStorage/tests/testSerialization.py	Mon Jan 13 14:16:22 2003
@@ -61,8 +61,10 @@
         mapper = self.root_mapper.getSubMapper('test_mapper')
         event = sfw.SerializationEvent(kos, mapper, ('',), ob)
         full_state = mapper.getSerializer().serialize(ob, event)
-        event = sfw.MapperEvent(mapper, ('',))
+        event = sfw.StoreEvent(mapper, ('',))
         mapper.getGateway().store(event, full_state)
+
+        event = sfw.LoadEvent(mapper, ('',))
         full_state, serial = mapper.getGateway().load(event)
         ob2 = PersistentMapping()
         event = sfw.DeserializationEvent(kos, mapper, ('',), ob2)
@@ -93,15 +95,18 @@
         mapper = self.root_mapper.getSubMapper('test_mapper_2')
         event = sfw.SerializationEvent(kos, mapper, ('',), ob)
         full_state = mapper.getSerializer().serialize(ob, event)
-        event = sfw.MapperEvent(mapper, ('',))
+        event = sfw.StoreEvent(mapper, ('',))
         mapper.getGateway().store(event, full_state)
+
         # Now load the state into a different object
+        event = sfw.LoadEvent(mapper, ('',))
         full_state, serial = mapper.getGateway().load(event)
         ob2 = PersistentMapping()
         event = sfw.DeserializationEvent(kos, mapper, ('',), ob2)
         mapper.getSerializer().deserialize(ob2, event, full_state)
         self.assertEqual(ob.extra.data, ob2.extra.data)
         self.assertEqual(ob.keys(), ob2.keys())
+
         # Check that both see the *same* object
         self.assert_(ob2['a'] is ob2.extra, (ob2['a'], ob2.extra))
         self.assert_(ob2['a'] is not data)  # Verify it didn't cheat somehow


=== Products/AdaptableStorage/tests/testZope2FS.py 1.10 => 1.10.2.1 ===
--- Products/AdaptableStorage/tests/testZope2FS.py:1.10	Fri Jan  3 17:04:24 2003
+++ Products/AdaptableStorage/tests/testZope2FS.py	Mon Jan 13 14:16:22 2003
@@ -21,34 +21,15 @@
 import unittest
 from tempfile import mktemp
 
-from ZODB import Persistent
-from Persistence import PersistentMapping
-from OFS.Folder import Folder
-from OFS.SimpleItem import SimpleItem
-
 from Products.AdaptableStorage.zodb.ASDB import ASDB
 from Products.AdaptableStorage.zodb.ASStorage import ASStorage
 from Products.AdaptableStorage.zodb.StaticResource import StaticResource
 from Products.AdaptableStorage.Zope2FS import createMapper
 
+from Zope2TestBase import Zope2TestBase, Folder
 
-class TestFolder(Folder):
-
-    meta_type = 'Zope2FS Test Folder'
-
-    def __init__(self, title):
-        self.title = title
-
-
-class TestFile(SimpleItem):
-
-    meta_type = 'Zope2FS Test File'
-
-    def __init__(self, content):
-        self.content = content
 
-
-class Zope2FSTests (unittest.TestCase):
+class Zope2FSTests (unittest.TestCase, Zope2TestBase):
 
     def _createMapper(self, path):
         return createMapper(path)
@@ -66,53 +47,15 @@
         self.storage = storage
         db = ASDB(storage, resource)
         self.db = db
+        get_transaction().begin()
 
     def tearDown(self):
+        get_transaction().abort()
         self.db.close()
         rmtree(self.path)
 
-    def testLoad(self):
-        conn = self.db.open()
-        try:
-            app = conn.root()['Application']
-            app.getId()
-        finally:
-            conn.close()
-
-    def testStore(self):
-        conn = self.db.open()
-        try:
-            app = conn.root()['Application']
-            f = Folder()
-            f.id = 'Holidays'
-            app._setObject(f.id, f, set_owner=0)
-            get_transaction().commit()
-
-            f2 = Folder()
-            f2.id = 'Christmas'
-            f._setObject(f2.id, f2, set_owner=0)
-            get_transaction().commit()
-
-            f3 = Folder()
-            f3.id = 'Eve'
-            f2._setObject(f3.id, f3, set_owner=0)
-            get_transaction().commit()
-
-            conn2 = self.db.open()
-            try:
-                app = conn2.root()['Application']
-                self.assert_(hasattr(app, 'Holidays'))
-                self.assert_(hasattr(app.Holidays, 'Christmas'))
-                self.assert_(hasattr(app.Holidays.Christmas, 'Eve'))
-            finally:
-                conn2.close()
-
-        finally:
-            conn.close()
-
     def testClassificationPreservation(self):
-        # Ensure that classification doesn't get forgotten, even though
-        # it's the container that writes it, not the object.
+        # Ensure that classification doesn't get forgotten.
         conn = self.db.open()
         try:
             app = conn.root()['Application']
@@ -140,8 +83,10 @@
             conn.close()
 
 
-    def testAnyFolderishStorage(self):
-        # Try to store a folderish object of an otherwise unknown class
+    def testMismatchedIdDetection(self):
+        # FSAutoID should detect when the keychain and ID don't match.
+        # Normally, the only time they don't match is when an object
+        # has been moved.
         conn = self.db.open()
         try:
             app = conn.root()['Application']
@@ -150,92 +95,32 @@
             app._setObject(f.id, f, set_owner=0)
             get_transaction().commit()
 
-            f2 = TestFolder("New Year's Eve")
-            f2.id = 'NewYear'
-            f._setObject(f2.id, f2, set_owner=0)
-            get_transaction().commit()
-
-            # Verify the object is in its own database record
-            self.assertNotEqual(f2._p_oid, None)
-            f2._p_changed = None
-            self.assert_(f2._p_changed is None)
-
-            # Verify the ability to load it
-            conn2 = self.db.open()
-            try:
-                app2 = conn2.root()['Application']
-                ff = app2.Holidays.NewYear
-                self.assertEqual(ff.title, "New Year's Eve")
-                self.assertEqual(ff.__class__, TestFolder)
-            finally:
-                conn2.close()
+            ob = app.Holidays
+            ob._setId('HolidayCalendar')
+            self.assertRaises(ValueError, get_transaction().commit)
         finally:
             conn.close()
 
 
-    def testAnyFileishStorage(self):
-        # Try to store a fileish object of an otherwise unknown class
+    def testReuseId(self):
+        # Verifies that ASConnection doesn't trip over reusing an OID that's
+        # no longer in use.
         conn = self.db.open()
         try:
-            content = 'insert wise expression here'
-
             app = conn.root()['Application']
-            f = TestFile(content)
-            f.id = 'testitem'
+            f = Folder()
+            f.id = 'Holidays'
             app._setObject(f.id, f, set_owner=0)
             get_transaction().commit()
 
-            # Verify the object is in its own database record
-            self.assertNotEqual(f._p_oid, None)
-            f._p_changed = None
-            self.assert_(f._p_changed is None)
-
-            # Verify the ability to load it
-            conn2 = self.db.open()
-            try:
-                app2 = conn2.root()['Application']
-                ff = app2.testitem
-                self.assertEqual(ff.content, content)
-                self.assertEqual(ff.__class__, TestFile)
-            finally:
-                conn2.close()
-        finally:
-            conn.close()
-
+            f = None  # Forget the reference to folder
+            app._delObject('Holidays')
+            get_transaction().commit()
 
-    def testStoreProperties(self):
-        conn = self.db.open()
-        try:
-            app = conn.root()['Application']
             f = Folder()
             f.id = 'Holidays'
-            f.title = 'Holiday Calendar'
             app._setObject(f.id, f, set_owner=0)
             get_transaction().commit()
-
-            f._setProperty('pi', 3.14, 'float')
-            f._setProperty('stuff', ['a', 'bc', 'd'], 'lines')
-            get_transaction().commit()
-
-            conn2 = self.db.open()
-            try:
-                app = conn2.root()['Application']
-                self.assert_(hasattr(app, 'Holidays'))
-                got = 0
-                for k, v in app.Holidays.propertyItems():
-                    if k == 'title':
-                        got += 1
-                        self.assertEqual(v, 'Holiday Calendar')
-                    elif k == 'pi':
-                        got += 1
-                        self.assertEqual(v, 3.14)
-                    elif k == 'stuff':
-                        got += 1
-                        self.assertEqual(v, ['a', 'bc', 'd'])
-                self.assertEqual(got, 3)
-            finally:
-                conn2.close()
-
         finally:
             conn.close()