[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests - ObjectHubSetup.py:1.1 __init__.py:1.1 testHookedHubEvent.py:1.1 testHubEvent.py:1.1 testObjectHub.py:1.1

Gary Poster gary@modernsongs.com
Tue, 29 Oct 2002 22:47:50 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests
In directory cvs.zope.org:/tmp/cvs-serv4759/App/OFS/Services/ObjectHub/tests

Added Files:
	ObjectHubSetup.py __init__.py testHookedHubEvent.py 
	testHubEvent.py testObjectHub.py 
Log Message:
This checkin cleans up the ObjectHub system.

First of all, the Zope.ObjectHub and Zope.App.OFS.Services.LocalObjectHub packages are gone, replaced by Zope.App.OFS.Services.ObjectHub, as per discussion with Jim and Steve.

Second, the hub events have been modified to match Jim's approach with the ObjectEvents (i.e., events are usually handed an object at the get go whenever possible).  Incidentally, this also coincides with making the "moved" hub event implementation actually coincide with the interface (by including fromLocation).  This is as per discussion with Jim.

Third, lookupLocation and lookupHubid have been switched to getLocation and getHubid.  This is a bit of a ninja-checkin or whatever the term is since I didn't bandy this change about beforehand.  :-(  Sorry.  If folks say nay then I will take responsibility for removing this change.

I think that's about it.




=== Added File Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests/ObjectHubSetup.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""

Revision information:
$Id: ObjectHubSetup.py,v 1.1 2002/10/30 03:47:48 poster Exp $
"""


from Zope.App.OFS.Services.LocalEventService.tests.EventSetup import \
     EventSetup
from Zope.ComponentArchitecture import getService, getServiceManager
from Zope.App.OFS.Services.ServiceManager.ServiceDirective \
     import ServiceDirective
from Zope.App.Traversing import getPhysicalPathString

from Zope.App.OFS.Services.ObjectHub.ObjectHub import ObjectHub

class ObjectHubSetup(EventSetup):
    
    def setUp(self):
        EventSetup.setUp(self)
        
        from Zope.App.OFS.Services.ObjectHub.IObjectHub import IObjectHub
        globsm=getServiceManager(None)
        globsm.defineService("ObjectHub", IObjectHub)
        self.createObjectHub()
    
    def createObjectHub(self, folder=None):
        if folder is None: folder=self.rootFolder
        if not folder.hasServiceManager():
            self.createServiceManager(folder)
        sm=getServiceManager(folder) # wrapped now
        sm.Packages['default'].setObject("myObjectHub",ObjectHub())

        path = "%s/Packages/default/myObjectHub" % getPhysicalPathString(sm)
        directive = ServiceDirective("ObjectHub", path)
        sm.Packages['default'].setObject("myObjectHubDir", directive)
        sm.bindService(directive)

=== Added File Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""

Revision information:
$Id: __init__.py,v 1.1 2002/10/30 03:47:48 poster Exp $
"""





=== Added File Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests/testHookedHubEvent.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""

Revision information:
$Id: testHookedHubEvent.py,v 1.1 2002/10/30 03:47:48 poster Exp $
"""

# in this version of these tests, we are no longer using a fake
# ObjectHub, which makes these tests less pure...but still useful
# as a test for both the events and the object hub for now.

import unittest, sys
from ObjectHubSetup import ObjectHubSetup
from Zope.App.OFS.Services.ObjectHub.HubEvent import \
     ObjectRegisteredHubEvent, ObjectUnregisteredHubEvent, \
     ObjectModifiedHubEvent, ObjectMovedHubEvent, \
     ObjectRemovedHubEvent
from Zope.App.Traversing import getPhysicalPathString

from Zope.Exceptions import NotFoundError
from Zope.ComponentArchitecture import getService
        
class AbstractTestHubEvent(ObjectHubSetup, unittest.TestCase):
    
    klass = None
    
    def setUp(self):
        ObjectHubSetup.setUp(self)
        self.object_hub = getService(self.rootFolder, "ObjectHub")
        self.obj = self.folder1_2_1
        self.hubid = self.object_hub.register(self.obj)
        self.location = getPhysicalPathString(self.obj)
        self.event = self.klass(self.object_hub,
                                self.hubid,
                                self.location,
                                self.obj)
        
    def testGetLocation(self):
        "Test getLocation method"
        self.assertEqual(self.event.location, self.location)
        
    def testGetHubId(self):
        "Test getHubId method"
        self.assertEqual(self.event.hubid, self.hubid)
    
    def testGetObject(self):
        "Test getObject method"
        self.assertEqual(self.event.object, self.obj)
    
class TestObjectRegisteredHubEvent(AbstractTestHubEvent):

    klass = ObjectRegisteredHubEvent

class TestEmptyObjectRegisteredHubEvent(TestObjectRegisteredHubEvent):
    
    def setUp(self):
        ObjectHubSetup.setUp(self)
        self.object_hub = getService(self.rootFolder, "ObjectHub")
        self.obj = self.folder1_2_1
        self.hubid = self.object_hub.register(self.obj)
        self.location = getPhysicalPathString(self.obj)
        self.event = self.klass(self.object_hub, self.hubid)

class TestObjectUnregisteredHubEvent(AbstractTestHubEvent):

    klass = ObjectUnregisteredHubEvent

class TestEmptyObjectUnregisteredHubEvent(TestObjectUnregisteredHubEvent):
    
    def setUp(self):
        ObjectHubSetup.setUp(self)
        self.object_hub = getService(self.rootFolder, "ObjectHub")
        self.obj = self.folder1_2_1
        self.hubid = self.object_hub.register(self.obj)
        self.location = getPhysicalPathString(self.obj)
        self.event = self.klass(self.object_hub, self.hubid, self.location)

class TestObjectModifiedHubEvent(AbstractTestHubEvent):

    klass = ObjectModifiedHubEvent

class TestEmptyObjectModifiedHubEvent(TestObjectModifiedHubEvent):
    
    def setUp(self):
        ObjectHubSetup.setUp(self)
        self.object_hub = getService(self.rootFolder, "ObjectHub")
        self.obj = self.folder1_2_1
        self.hubid = self.object_hub.register(self.obj)
        self.location = getPhysicalPathString(self.obj)
        self.event = self.klass(self.object_hub, self.hubid)

class TestObjectMovedHubEvent(AbstractTestHubEvent):
    
    fromLocation = '/old/location'
    
    def setUp(self):
        ObjectHubSetup.setUp(self)
        self.object_hub = getService(self.rootFolder, "ObjectHub")
        self.obj = self.folder1_2_1
        self.hubid = self.object_hub.register(self.obj)
        self.location = getPhysicalPathString(self.obj)
        self.event = self.klass(self.object_hub,
                                self.hubid,
                                self.fromLocation,
                                self.location,
                                self.obj)
    
    def testGetFromLocation(self):
        "Test from location"
        self.assertEqual(self.event.fromLocation, self.fromLocation)

    klass = ObjectMovedHubEvent

class TestEmptyObjectMovedHubEvent(TestObjectMovedHubEvent):
    
    def setUp(self):
        ObjectHubSetup.setUp(self)
        self.object_hub = getService(self.rootFolder, "ObjectHub")
        self.obj = self.folder1_2_1
        self.hubid = self.object_hub.register(self.obj)
        self.location = getPhysicalPathString(self.obj)
        self.event = self.klass(self.object_hub,
                                self.hubid,
                                self.fromLocation)

class TestObjectRemovedHubEvent(AbstractTestHubEvent):

    klass = ObjectRemovedHubEvent

# Hooked empty object removed not needed

def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(TestObjectRegisteredHubEvent),
        unittest.makeSuite(TestEmptyObjectRegisteredHubEvent),
        unittest.makeSuite(TestObjectUnregisteredHubEvent),
        unittest.makeSuite(TestEmptyObjectUnregisteredHubEvent),
        unittest.makeSuite(TestObjectModifiedHubEvent),
        unittest.makeSuite(TestEmptyObjectModifiedHubEvent),
        unittest.makeSuite(TestObjectMovedHubEvent),
        unittest.makeSuite(TestEmptyObjectMovedHubEvent),
        unittest.makeSuite(TestObjectRemovedHubEvent),
        ))

if __name__=='__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests/testHubEvent.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""

Revision information:
$Id: testHubEvent.py,v 1.1 2002/10/30 03:47:48 poster Exp $
"""

import unittest, sys

from Zope.App.OFS.Services.ObjectHub.HubEvent import \
     ObjectRegisteredHubEvent, ObjectUnregisteredHubEvent, \
     ObjectModifiedHubEvent, ObjectMovedHubEvent, \
     ObjectRemovedHubEvent

from Zope.Exceptions import NotFoundError

class DummyObjectHub:

    def __init__(self, ruid, obj, location):
        self.ruid = ruid
        self.obj = obj
        self.location = location
    
    
    def getObject(self, ruid):
        if ruid==self.ruid:
            return self.obj
            
        raise NotFoundError
    
    def getLocation(self, ruid):
        if ruid==self.ruid:
            return self.location
            
        raise NotFoundError
        
        
class AbstractTestHubEvent(unittest.TestCase):
    
    location = '/some/location'
    hubid = 23
    obj = object()
    klass = None
    
    def setUp(self):
        self.hub = DummyObjectHub(self.hubid, self.obj, self.location)
        self.event = self.klass(self.hub, self.hubid, self.location, self.obj)
    
    def testGetHub(self):
        self.assertEqual(self.event.hub, self.hub)
        
    def testGetLocation(self):
        self.assertEqual(self.event.location, self.location)
        
    def testGetHubId(self):
        "Test hubid"
        self.assertEqual(self.event.hubid, self.hubid)
    
    def testGetObject(self):
        self.assertEqual(self.event.object, self.obj)
    
class TestObjectRegisteredHubEvent(AbstractTestHubEvent):

    klass = ObjectRegisteredHubEvent

class TestEmptyObjectRegisteredHubEvent(TestObjectRegisteredHubEvent):
    
    def setUp(self):
        self.hub = DummyObjectHub(self.hubid, self.obj, self.location)
        self.event = self.klass(self.hub, self.hubid)

class TestObjectUnregisteredHubEvent(AbstractTestHubEvent):

    klass = ObjectUnregisteredHubEvent

class TestEmptyObjectUnregisteredHubEvent(unittest.TestCase):
    
    location = '/some/location'
    hubid = 23
    obj = object()
    klass = None
    
    klass = ObjectUnregisteredHubEvent
    
    def testRaisesTypeError(self):
        self.assertRaises(TypeError,
                          self.klass,
                          DummyObjectHub(self.hubid,
                                         self.obj,
                                         self.location),
                          self.hubid)

class TestObjectModifiedHubEvent(AbstractTestHubEvent):

    klass = ObjectModifiedHubEvent

class TestEmptyObjectModifiedHubEvent(TestObjectModifiedHubEvent):
    
    def setUp(self):
        self.hub = DummyObjectHub(self.hubid, self.obj, self.location)
        self.event = self.klass(self.hub, self.hubid)

class TestObjectMovedHubEvent(AbstractTestHubEvent):
    
    fromLocation = '/old/location'
    
    def setUp(self):
        self.hub = DummyObjectHub(self.hubid, self.obj, self.location)
        self.event = self.klass(self.hub,
                                self.hubid,
                                self.fromLocation,
                                self.location,
                                self.obj)
    
    def testGetFromLocation(self):
        "Test from location"
        self.assertEqual(self.event.fromLocation, self.fromLocation)

    klass = ObjectMovedHubEvent

class TestEmptyObjectMovedHubEvent(TestObjectMovedHubEvent):
    
    def setUp(self):
        self.hub = DummyObjectHub(self.hubid, self.obj, self.location)
        self.event = self.klass(self.hub,
                                self.hubid,
                                self.fromLocation)

class TestObjectRemovedHubEvent(AbstractTestHubEvent):

    klass = ObjectRemovedHubEvent

class TestEmptyObjectRemovedHubEvent(TestEmptyObjectUnregisteredHubEvent):

    klass = ObjectRemovedHubEvent

def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(TestObjectRegisteredHubEvent),
        unittest.makeSuite(TestEmptyObjectRegisteredHubEvent),
        unittest.makeSuite(TestObjectUnregisteredHubEvent),
        unittest.makeSuite(TestEmptyObjectUnregisteredHubEvent),
        unittest.makeSuite(TestObjectModifiedHubEvent),
        unittest.makeSuite(TestEmptyObjectModifiedHubEvent),
        unittest.makeSuite(TestObjectMovedHubEvent),
        unittest.makeSuite(TestEmptyObjectMovedHubEvent),
        unittest.makeSuite(TestObjectRemovedHubEvent),
        unittest.makeSuite(TestEmptyObjectRemovedHubEvent),
        ))

if __name__=='__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/lib/python/Zope/App/OFS/Services/ObjectHub/tests/testObjectHub.py === (435/535 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""testObjectHub

Revision information:
$Id: testObjectHub.py,v 1.1 2002/10/30 03:47:48 poster Exp $
"""

import unittest, sys
from ObjectHubSetup import ObjectHubSetup

from Zope.Event.IObjectEvent import IObjectAddedEvent, IObjectRemovedEvent
from Zope.Event.IObjectEvent import IObjectModifiedEvent, IObjectMovedEvent
from Zope.Event.ObjectEvent import ObjectAddedEvent, ObjectModifiedEvent
from Zope.Event.ObjectEvent import ObjectRemovedEvent, ObjectMovedEvent
from Zope.Event.ISubscriber import ISubscriber

from Zope.App.OFS.Services.ObjectHub.IObjectHub import ObjectHubError
from Zope.App.OFS.Services.ObjectHub.IHubEvent import \
     IObjectRemovedHubEvent, IObjectModifiedHubEvent, \
     IObjectMovedHubEvent, IObjectRegisteredHubEvent, \
     IObjectUnregisteredHubEvent

import Zope.App.OFS.Services.ObjectHub.HubEvent as HubIdObjectEvent

from Zope.Exceptions import NotFoundError
from types import StringTypes

from Zope.App.Traversing import locationAsUnicode

from Zope.ComponentArchitecture import getService, getServiceManager

# while these tests don't really test much of the placeful aspect of the
# object hub, they do at least test basic functionality.

# we'll need real tests of the placeful aspects, but for now a basic
# test happens simply by virtue of the testHubEvent module in this
# directory


[-=- -=- -=- 435 lines omitted -=- -=- -=-]

        
        hub.notify(moved_event)
        self.assertRaises(NotFoundError, hub.getHubId, location)
        self.assertRaises(NotFoundError, hub.getHubId, new_location)
        
        self.subscriber.verifyEventsReceived(self, [
                (IObjectMovedEvent, new_location),
                ])


    def testMovedToExistingLocation(self):
        """Test that moving to an existing location raises ObjectHubError.
        """
        hub = self.object_hub
        added_event = self.added_event
        added_event2 = self.added_new_location_event
        moved_event = self.moved_event
        location = self.location
        new_location = self.new_location
        
        hub.notify(added_event)
        hub.notify(added_event2)
        
        self.assertRaises(ObjectHubError, hub.notify, moved_event)
        
        self.subscriber.verifyEventsReceived(self, [
                (IObjectAddedEvent, location),
                (IObjectRegisteredHubEvent, None, location),
                (IObjectAddedEvent, new_location),
                (IObjectRegisteredHubEvent, None, new_location),
                (IObjectMovedEvent, new_location),
            ])

def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(TransmitObjectRemovedHubEventTest),
        unittest.makeSuite(TransmitObjectModifiedHubEventTest),
        unittest.makeSuite(TransmitObjectMovedHubEventTest),
        unittest.makeSuite(TransmitObjectRegisteredHubEventTest),
        unittest.makeSuite(TransmitObjectUnregisteredHubEventTest),
        unittest.makeSuite(TestRegistrationEvents),
        unittest.makeSuite(TestNoRegistration),
        unittest.makeSuite(TestObjectAddedEvent),
        unittest.makeSuite(TestObjectRemovedEvent),
        unittest.makeSuite(TestObjectModifiedEvent),
        unittest.makeSuite(TestObjectMovedEvent),
        ))

if __name__=='__main__':
    unittest.main(defaultTest='test_suite')