[Zope-CVS] CVS: Products/AdaptableStorage/tests - .cvsignore:1.1 SerialTestBase.py:1.1 __init__.py:1.1 testASStorage.py:1.1 testAll.py:1.1 testInterfaceImpl.py:1.1 testSerialization.py:1.1 testZope2FS.py:1.1

Shane Hathaway shane@zope.com
Wed, 27 Nov 2002 13:37:09 -0500


Update of /cvs-repository/Products/AdaptableStorage/tests
In directory cvs.zope.org:/tmp/cvs-serv12157/tests

Added Files:
	.cvsignore SerialTestBase.py __init__.py testASStorage.py 
	testAll.py testInterfaceImpl.py testSerialization.py 
	testZope2FS.py 
Log Message:
Moved the latest AdaptableStorage work out of the private repository.
It took a long time, but I moved it as soon as all the unit tests
passed and I felt that all the interface names and conventions were
good enough.

Documentation is still minimal, but now I think the system is finally
straight enough in my head to write down. :-) If you want a sneak
peek, the interfaces have some docstrings, if you're looking for a
"tree" view, while the OpenOffice diagram presents something of a
"forest" view.

Also note that I'm trying a new coding convention.  The "public"
module in each package defines exactly which objects should be
exported from the package.  This solves a few problems with imports
such as doubling of names and shadowing of modules.  Overall, the
"public" module makes it easier to tell which classes are supposed to
be used by other packages, and makes it easier for other packages to
use the public classes.  See what you think.



=== Added File Products/AdaptableStorage/tests/.cvsignore ===
*.pyc


=== Added File Products/AdaptableStorage/tests/SerialTestBase.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Serialization test setup/teardown

$Id: SerialTestBase.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""


import ZODB
from Persistence import PersistentMapping

from Products.AdaptableStorage.serial import public as sfw
from Products.AdaptableStorage.serial_std.public import \
     StringDataAttribute, MappingGateway, FixedPersistentMapping, RollCall


class SimpleItemsAspect:

    __implements__ = sfw.IAspectSerializer

    schema = sfw.RecordSchema()
    schema.addColumn('name')
    schema.addColumn('value')

    def getSchema(self):
        return self.schema

    def serialize(self, object, event):
        res = object.items()
        res.sort()
        for k, v in res:
            event.notifySerialized(k, v)
        event.ignoreAttribute('data')
        event.ignoreAttribute('_container')
        return res

    def deserialize(self, object, event, state):
        d = {}
        for k, v in state:
            d[k] = v
        object.__init__(d)


class SerialTestBase:

    def setUp(self):
        dm = sfw.DomainMapper(None)
        self.dm = dm

        class_info = (('Persistence', 'PersistentMapping'), None)

        ser1 = sfw.ObjectSerializer(class_info)
        items_aspect = SimpleItemsAspect()
        ser1.addAspect('items', items_aspect)
        props_aspect = StringDataAttribute('strdata')
        ser1.addAspect('properties', props_aspect)
        ser1.addAspect('roll_call', RollCall())

        gw1 = sfw.ObjectGateway()
        items_gw = MappingGateway(items_aspect.getSchema())
        self.items_gw = items_gw
        gw1.addGateway('items', items_gw)
        props_gw = MappingGateway(props_aspect.getSchema())
        self.props_gw = props_gw
        gw1.addGateway('properties', props_gw)

        om1 = sfw.ObjectMapper(ser1, gw1, dm)
        self.om1 = om1
        dm.addMapper('test', om1)

        ser2 = sfw.ObjectSerializer(class_info)
        fixed_items_aspect = FixedPersistentMapping({'TestRoot': ('test', '')})
        ser2.addAspect('fixed_items', fixed_items_aspect)
        ser2.addAspect('roll_call', RollCall())

        om2 = sfw.ObjectMapper(ser2, sfw.ObjectGateway(), dm)
        self.om2 = om2
        dm.addMapper('root', om2)

    def tearDown(self):
        pass



=== Added File Products/AdaptableStorage/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""AdaptableStorage tests package

$Id: __init__.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""



=== Added File Products/AdaptableStorage/tests/testASStorage.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Storage tests (with data stored in simple mappings)

$Id: testASStorage.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""

import unittest

import ZODB
from Persistence import PersistentMapping

from Products.AdaptableStorage.zodb.ASDB import ASDB
from Products.AdaptableStorage.zodb.ASStorage import ASStorage
from Products.AdaptableStorage.zodb.StaticResource import StaticResource
from SerialTestBase import SerialTestBase


class ASStorageTests (SerialTestBase, unittest.TestCase):

    def setUp(self):
        SerialTestBase.setUp(self)
        resource = StaticResource(self.dm)
        storage = ASStorage(resource)
        self.storage = storage
        db = ASDB(storage, resource)
        self.db = db

    def tearDown(self):
        self.db.close()
        SerialTestBase.tearDown(self)

    def testStoreAndLoad(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob['a'] = 'b'
        ob['c'] = 'd'

        conn1 = self.db.open()
        conn2 = None
        conn3 = None
        try:

            # Load the root and create a new object
            root = conn1['root:']
            get_transaction().begin()
            root['TestRoot'] = ob
            get_transaction().commit()
            ob1 = conn1['test:']
            self.assertEqual(ob1.strdata, ob.strdata)
            self.assertEqual(ob1.items(), ob.items())

            # Verify a new object was stored and make a change
            get_transaction().begin()
            conn2 = self.db.open()
            ob2 = conn2['test:']
            self.assertEqual(ob2.strdata, ob.strdata)
            self.assertEqual(ob2.items(), ob.items())
            ob2.strdata = '678'
            get_transaction().commit()

            # Verify the change was stored and make another change
            conn3 = self.db.open()
            ob3 = conn3['test:']
            self.assertEqual(ob3.strdata, '678')
            self.assertEqual(ob3.items(), ob.items())
            ob3.strdata = '901'
            get_transaction().commit()
            conn3.close()
            conn3 = None
            conn3 = self.db.open()
            ob3 = conn3['test:']
            self.assertEqual(ob3.strdata, '901')

            # Verify we didn't accidentally change the original object
            self.assertEqual(ob.strdata, '345')

        finally:
            conn1.close()
            if conn2 is not None:
                conn2.close()
            if conn3 is not None:
                conn3.close()


if __name__ == '__main__':
    unittest.main()



=== Added File Products/AdaptableStorage/tests/testAll.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Run all unit tests

$Id: testAll.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""

import unittest

from testSerialization import SerializationTests
from testInterfaceImpl import InterfaceImplTests
from testASStorage import ASStorageTests
from testZope2FS import Zope2FSTests

if __name__ == '__main__':
    unittest.main()



=== Added File Products/AdaptableStorage/tests/testInterfaceImpl.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interface implementation tests

$Id: testInterfaceImpl.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""

import unittest
from types import ListType, TupleType

from Interface import Interface
from Interface.Verify import verifyClass


class InterfaceImplTests(unittest.TestCase):

    def _testObjectImpl(self, c):
        impl = c.__implements__
        if isinstance(impl, ListType) or isinstance(impl, TupleType):
            for iface in impl:
                self._verify(iface, c)
        else:
            self._verify(impl, c)

    def _testAllInModule(self, m):
        for attr, value in m.__dict__.items():
            if (hasattr(value, '__implements__') and
                not Interface.isImplementedBy(value)):
                self._testObjectImpl(value)

    def _verify(self, iface, c):
        verifyClass(iface, c)
        for base in iface.getBases():
            self._verify(base, c)

    def testSerialPublicImplementations(self):
        from Products.AdaptableStorage.serial import public
        self._testAllInModule(public)

    def testSerialStdImplementations(self):
        from Products.AdaptableStorage.serial_std import public
        self._testAllInModule(public)

    def testSerialOFSImplementations(self):
        from Products.AdaptableStorage.serial_ofs import public
        self._testAllInModule(public)

    def testGatewayFSImplementations(self):
        from Products.AdaptableStorage.gateway_fs import public
        self._testAllInModule(public)

if __name__ == '__main__':
    unittest.main()



=== Added File Products/AdaptableStorage/tests/testSerialization.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Serialization tests

$Id: testSerialization.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""

import unittest

import ZODB
from Persistence import PersistentMapping

from Products.AdaptableStorage.serial import public as sfw
from SerialTestBase import SerialTestBase


class SerializationTests(SerialTestBase, unittest.TestCase):

    def getKeyedObjectSystem(self):
        # XXX works for now
        return None

    def testSerializeAndDeserialize(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob['a'] = 'b'
        ob['c'] = 'd'
        kos = self.getKeyedObjectSystem()
        mapper = self.dm.getMapper('test')
        full_state, refs = mapper.getSerializer().serialize(
            mapper, '', ob, kos)
        ob2 = PersistentMapping()
        mapper.getSerializer().deserialize(mapper, '', ob2, kos, full_state)
        self.assertEqual(ob.strdata, ob2.strdata)
        self.assertEqual(ob.items(), ob2.items())

    def testStoreAndLoad(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob['a'] = 'b'
        ob['c'] = 'd'
        kos = self.getKeyedObjectSystem()
        mapper = self.dm.getMapper('test')
        full_state, refs = mapper.getSerializer().serialize(
            mapper, '', ob, kos)
        mapper.getGateway().store(mapper, '', full_state)
        full_state, serial = mapper.getGateway().load(mapper, '')
        ob2 = PersistentMapping()
        mapper.getSerializer().deserialize(mapper, '', ob2, kos, full_state)
        self.assertEqual(ob.strdata, ob2.strdata)
        self.assertEqual(ob.items(), ob2.items())

    def testCatchExtraAttribute(self):
        ob = PersistentMapping()
        ob.strdata = '345'
        ob.extra = '678'
        ob['a'] = 'b'
        ob['c'] = 'd'
        kos = self.getKeyedObjectSystem()
        mapper = self.dm.getMapper('test')
        self.assertRaises(sfw.SerializationError,
                          mapper.getSerializer().serialize, mapper,
                          '', ob, kos)


if __name__ == '__main__':
    unittest.main()


=== Added File Products/AdaptableStorage/tests/testZope2FS.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test of storing folders on the filesystem via ZODB

$Id: testZope2FS.py,v 1.1 2002/11/27 18:37:08 shane Exp $
"""

import os
from shutil import rmtree
import unittest
from tempfile import mktemp

import ZODB
from Persistence import PersistentMapping

from Products.AdaptableStorage.zodb.ASDB import ASDB
from Products.AdaptableStorage.zodb.ASStorage import ASStorage
from Products.AdaptableStorage.zodb.StaticResource import StaticResource
from Products.AdaptableStorage.Zope2FS import createDomainMapper


class Zope2FSTests (unittest.TestCase):

    def setUp(self):
        path = mktemp()
        os.mkdir(path)
        self.path = path
        dm = createDomainMapper(path)
        self.dm = dm
        resource = StaticResource(dm)
        storage = ASStorage(resource)
        self.storage = storage
        db = ASDB(storage, resource)
        self.db = db

    def tearDown(self):
        self.db.close()
        rmtree(self.path)

    def testLoad(self):
        conn = self.db.open()
        try:
            app = conn.root()['Application']
            app.getId()
        finally:
            conn.close()

    def testStore(self):
        conn = self.db.open()
        try:
            from OFS.Folder import Folder
            app = conn.root()['Application']
            f = Folder()
            f.id = 'Holidays'
            app._setObject(f.id, f, set_owner=0)
            get_transaction().commit()

            f2 = Folder()
            f2.id = 'Christmas'
            f._setObject(f2.id, f2, set_owner=0)
            get_transaction().commit()

            conn2 = self.db.open()
            try:
                app = conn2.root()['Application']
                self.assert_(hasattr(app, 'Holidays'))
                self.assert_(hasattr(app.Holidays, 'Christmas'))
            finally:
                conn2.close()

        finally:
            conn.close()


if __name__ == '__main__':
    unittest.main()