[Zope3-checkins] CVS: Zope3/src/zope/app/workflow/tests - __init__.py:1.4 directive_helpers.py:1.1 test_definition.py:1.1 test_directives.py:1.1 test_importexport.py:1.1 test_instance.py:1.1 test_service.py:1.1 workflowsetup.py:1.1

Ulrich Eck ueck@net-labs.de
Thu, 8 May 2003 13:27:21 -0400


Update of /cvs-repository/Zope3/src/zope/app/workflow/tests
In directory cvs.zope.org:/tmp/cvs-serv7538/src/zope/app/workflow/tests

Added Files:
	__init__.py directive_helpers.py test_definition.py 
	test_directives.py test_importexport.py test_instance.py 
	test_service.py workflowsetup.py 
Log Message:
Finally got it into Zope3:

Workflow has arrived!

this is a merge of the workflow package that was seperatly developed
at /Packages3/workflow.

please to a 

cvs update -dPA 

to ensure that old files/directories are deleted, otherwise you'll 
probably encounter errors when trying to run zope3

if you have problems .. send me an email ueck <at> net-labs.de

Ulrich


=== Zope3/src/zope/app/workflow/tests/__init__.py 1.3 => 1.4 ===


=== Added File Zope3/src/zope/app/workflow/tests/directive_helpers.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

__metaclass__ = type

from zope.app.interfaces.workflow import IProcessDefinition
from zope.app.interfaces.workflow import IGlobalProcessDefinitionImportExport
from zope.app.interfaces.workflow import IProcessDefinitionImportHandler
from zope.app.interfaces.workflow import IProcessDefinitionExportHandler



class ITestProcessDefinitionA(IProcessDefinition):
    pass

class ITestProcessDefinitionB(IProcessDefinition):
    pass


class TestImportHandlerA:

    __implements__ = IProcessDefinitionImportHandler

    def canImport(self, context, data):
        return bool(data.read() == 'A')

    def doImport(self, context, data):
        return 'Imported A'


class TestImportHandlerB:

    __implements__ = IProcessDefinitionImportHandler

    def canImport(self, context, data):
        return bool(data.read() == 'B')

    def doImport(self, context, data):
        return 'Imported B'



class TestExportHandlerA:

    __implements__ = IProcessDefinitionExportHandler

    def doExport(self, context, process_definition):
        return 'Exported A'


class TestExportHandlerB:

    __implements__ = IProcessDefinitionExportHandler

    def doExport(self, context, process_definition):
        return 'Exported B'


=== Added File Zope3/src/zope/app/workflow/tests/test_definition.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import unittest

from zope.interface import Interface
from zope.interface.verify import verifyClass

from zope.app.interfaces.workflow import IProcessDefinition
from zope.app.workflow.definition import ProcessDefinition

from zope.app.interfaces.workflow import IProcessDefinitionElementContainer
from zope.app.workflow.definition import ProcessDefinitionElementContainer


class ProcessDefinitionTests(unittest.TestCase):

    def testInterface(self):
        verifyClass(IProcessDefinition, ProcessDefinition)

    def testPDCreation(self):
        pd = ProcessDefinition()
        pi = pd.createProcessInstance(None)



from zope.app.container.tests.test_icontainer \
     import Test as ContainerTests

class ProcessDefinitionElementContainerTests(ContainerTests):

    def testIProcessDefinitionElementContainer(self):
        verifyClass(IProcessDefinitionElementContainer,
                    ProcessDefinitionElementContainer)



def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(ProcessDefinitionTests),
        unittest.makeSuite(ProcessDefinitionElementContainerTests),
        ))

if __name__ == '__main__':
    unittest.main()


=== Added File Zope3/src/zope/app/workflow/tests/test_directives.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import unittest
import sys
import os
from cStringIO import StringIO

from zope.app.tests.placelesssetup import PlacelessSetup

from zope.configuration.xmlconfig import xmlconfig, XMLConfig

import zope.configuration
import zope.app.workflow

from zope.app.workflow import globalimportexport
from zope.app.workflow.tests import directive_helpers

gIE = globalimportexport.globalImportExport


template = """<zopeConfigure
   xmlns:test='http://namespaces.zope.org/workflow'>
   %s
   </zopeConfigure>"""


class Test(PlacelessSetup, unittest.TestCase):


    def setUp(self):
        PlacelessSetup.setUp(self)
        XMLConfig('metameta.zcml', zope.configuration)()
        XMLConfig('meta.zcml', zope.app.workflow)()

    def testImportHandler(self):

        xmlconfig(StringIO(template % (
            """
            <test:importHandler
             interface="zope.app.workflow.tests.directive_helpers.ITestProcessDefinitionA"
             factory="zope.app.workflow.tests.directive_helpers.TestImportHandlerA"
            />
            """
            )))

        self.assertEqual(directive_helpers.TestImportHandlerA,
                         gIE._importers.get(directive_helpers.ITestProcessDefinitionA))

    def testExportHandler(self):

        xmlconfig(StringIO(template % (
            """
            <test:exportHandler
             interface="zope.app.workflow.tests.directive_helpers.ITestProcessDefinitionA"
             factory="zope.app.workflow.tests.directive_helpers.TestExportHandlerA"
            />
            """
            )))

        self.assertEqual(directive_helpers.TestExportHandlerA,
                         gIE._exporters.get(directive_helpers.ITestProcessDefinitionA))


        

def test_suite():
    loader=unittest.TestLoader()
    return loader.loadTestsFromTestCase(Test)

if __name__=='__main__':
    unittest.TextTestRunner().run(test_suite())


=== Added File Zope3/src/zope/app/workflow/tests/test_importexport.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import unittest
import sys
import os
from cStringIO import StringIO

from zope.app.tests.placelesssetup import PlacelessSetup

from zope.app.workflow import globalimportexport
from zope.app.workflow.tests import directive_helpers

gIE = globalimportexport.globalImportExport
dh = directive_helpers

class PDA:

    __implements__ = dh.ITestProcessDefinitionA

class PDB:

    __implements__ = dh.ITestProcessDefinitionB




class Test(PlacelessSetup, unittest.TestCase):


    def setUp(self):
        PlacelessSetup.setUp(self)
        gIE.addImportHandler(dh.ITestProcessDefinitionA,
                             dh.TestImportHandlerA)
        gIE.addImportHandler(dh.ITestProcessDefinitionB,
                             dh.TestImportHandlerB)
        gIE.addExportHandler(dh.ITestProcessDefinitionA,
                             dh.TestExportHandlerA)
        gIE.addExportHandler(dh.ITestProcessDefinitionB,
                             dh.TestExportHandlerB)

    def testImportHandler(self):
        self.assertEqual(gIE.importProcessDefinition(None, 'A'),
                         'Imported A')
        self.assertEqual(gIE.importProcessDefinition(None, 'B'),
                         'Imported B')

        self.assertRaises(ValueError, gIE.importProcessDefinition, None, 'C')

    def testExportHandler(self):
        self.assertEqual(gIE.exportProcessDefinition(None, PDA()),
                         'Exported A')
        self.assertEqual(gIE.exportProcessDefinition(None, PDB()),
                         'Exported B')
        

        

def test_suite():
    loader=unittest.TestLoader()
    return loader.loadTestsFromTestCase(Test)

if __name__=='__main__':
    unittest.TextTestRunner().run(test_suite())



=== Added File Zope3/src/zope/app/workflow/tests/test_instance.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

import unittest
from zope.interface.verify import verifyClass

from zope.app.interfaces.annotation import IAnnotations
from zope.app.tests.placelesssetup import PlacelessSetup

from zope.app.interfaces.workflow import IProcessInstance
from zope.app.workflow.instance import ProcessInstance

from zope.app.interfaces.workflow import IProcessInstanceContainer
from zope.app.workflow.instance import ProcessInstanceContainerAdapter, WFKey

class TestAnnotations(dict):

    __implements__ = IAnnotations

class DummyInstance:
    pass


class ProcessInstanceTests(unittest.TestCase):

    def testInterface(self):
        verifyClass(IProcessInstance, ProcessInstance)



class ProcessInstanceContainerAdapterTests(PlacelessSetup, unittest.TestCase):

    def testInterface(self):
        verifyClass(IProcessInstanceContainer,
                    ProcessInstanceContainerAdapter)

    def testAdapter(self):

        annotations = TestAnnotations()
        di = DummyInstance()
        pica = ProcessInstanceContainerAdapter(annotations)

        self.assertEqual(annotations.keys(), [WFKey,])
        self.assertEqual(len(pica), 0)
        self.assertEqual(pica.keys(), [])
        self.assertEqual(pica.items(), [])
        self.assertEqual(pica.values(), [])
        self.assertEqual(pica.get('nothing', 1), 1)
        self.assertRaises(TypeError, pica.setObject, 123, None)
        
        pica.setObject('dummy', di)
        self.assertEqual(len(pica), 1)
        self.assertEqual(pica.keys(), ['dummy'])
        self.assertEqual(pica.values(), [di])
        self.assertEqual(pica.items(), [('dummy', di,),])
        self.assertEqual(pica['dummy'], di)
        self.assertEqual(pica.get('dummy', 1), di)
        self.failUnless('dummy' in pica)

        del pica['dummy']

        self.assertEqual(len(pica), 0)
        self.failIf('dummy' in pica)
        
        





def test_suite():
    return unittest.TestSuite((
        unittest.makeSuite(ProcessInstanceTests),
        unittest.makeSuite(ProcessInstanceContainerAdapterTests),
        ))

if __name__ == '__main__':
    unittest.main()


=== Added File Zope3/src/zope/app/workflow/tests/test_service.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

Revision information:
$Id: test_service.py,v 1.1 2003/05/08 17:27:20 jack-e Exp $
"""

import unittest

from zope.interface import Interface
from zope.interface.verify import verifyClass

from zope.app.traversing import traverse
from zope.app.container.zopecontainer import ZopeContainerAdapter
from zope.app.interfaces.annotation import IAttributeAnnotatable
from zope.app.interfaces.services.configuration \
     import IUseConfigurable

from zope.app.interfaces.services.configuration \
     import Active, Unregistered, Registered

from zope.app.workflow.tests.workflowsetup import WorkflowSetup
from zope.app.interfaces.workflow \
     import IWorkflowService, IProcessDefinition
from zope.app.workflow.service import WorkflowService
from zope.app.workflow.service import ProcessDefinitionConfiguration

# define and create dummy ProcessDefinition (PD) for tests
class DummyProcessDefinition:
    __implements__ = IProcessDefinition, IAttributeAnnotatable, IUseConfigurable

    def __init__(self, n):
        self.n = n

    def __str__(self):
        return'PD #%d' % self.n
    
    def createProcessInstance(self, definition_name):
        return 'PI #%d' % self.n


def sort(l):
    l = [str(d) for d in l]
    l.sort()
    return l

class WorkflowServiceTests(WorkflowSetup, unittest.TestCase):

    def setUp(self):
        WorkflowSetup.setUp(self)
        # setup ProcessDefinitions
        self.default.setObject('pd1', DummyProcessDefinition(1))
        self.default.setObject('pd2', DummyProcessDefinition(2))

        self.cm.setObject('', ProcessDefinitionConfiguration('definition1',
                                '/++etc++site/default/pd1'))
        traverse(self.default.getConfigurationManager(), '2').status = Active
        self.cm.setObject('', ProcessDefinitionConfiguration('definition2',
                                '/++etc++site/default/pd2'))
        traverse(self.default.getConfigurationManager(), '3').status = Active
        self.cm.setObject('', ProcessDefinitionConfiguration('definition3',
                                '/++etc++site/default/pd1'))
        traverse(self.default.getConfigurationManager(), '4').status = Registered
        # Now self.service has definition1 and definition2 available
        # and knows about definition3

        self.default1.setObject('pd3', DummyProcessDefinition(3))
        self.default1.setObject('pd4', DummyProcessDefinition(4))

        self.cm1.setObject('', ProcessDefinitionConfiguration('definition1',
                            '/folder1/++etc++site/default/pd3'))
        traverse(self.default1.getConfigurationManager(), '2').status = Active
        self.cm1.setObject('', ProcessDefinitionConfiguration('definition4',
                            '/folder1/++etc++site/default/pd4'))
        traverse(self.default1.getConfigurationManager(), '3').status = Active
        # Now self.service1 overrides definition1, adds new definition4 available,
        # and inherits definition2 from self.service

    def testInterface(self):
        verifyClass(IWorkflowService, WorkflowService)

    def testGetProcessDefiniton(self):
        self.assertEqual('PD #1', str(self.service.getProcessDefinition('definition1')))
        self.assertEqual('PD #2', str(self.service.getProcessDefinition('definition2')))
        self.assertRaises(KeyError, self.service.getProcessDefinition, 'definition3')
        self.assertRaises(KeyError, self.service.getProcessDefinition, 'definition4')
                          
        self.assertEqual('PD #3', str(self.service1.getProcessDefinition('definition1')))
        self.assertEqual('PD #2', str(self.service1.getProcessDefinition('definition2')))
        self.assertRaises(KeyError, self.service1.getProcessDefinition, 'definition3')
        self.assertEqual('PD #4', str(self.service1.getProcessDefinition('definition4')))
        self.assertRaises(KeyError, self.service1.getProcessDefinition, 'definition5')

    def testQueryProcessDefinition(self):
        self.assertEqual('PD #1', str(self.service.queryProcessDefinition('definition1')))
        self.assertEqual('PD #2', str(self.service.queryProcessDefinition('definition2')))
        self.assertEqual(None, self.service.queryProcessDefinition('definition3'))
        self.assertEqual('xx', self.service.queryProcessDefinition('definition3', 'xx'))
        self.assertEqual(None, self.service.queryProcessDefinition('definition4'))
        self.assertEqual('xx', self.service.queryProcessDefinition('definition4', 'xx'))

        self.assertEqual('PD #3', str(self.service1.queryProcessDefinition('definition1')))
        self.assertEqual('PD #2', str(self.service1.queryProcessDefinition('definition2')))
        self.assertEqual(None, self.service1.queryProcessDefinition('definition3'))
        self.assertEqual('xx', self.service1.queryProcessDefinition('definition3', 'xx'))
        self.assertEqual('PD #4', str(self.service1.queryProcessDefinition('definition4')))
        self.assertEqual(None, self.service1.queryProcessDefinition('definition5'))
        self.assertEqual('xx', self.service1.queryProcessDefinition('definition5', 'xx'))


    def testGetProcessDefinitonNames(self):
        self.assertEqual(['definition1', 'definition2'],
                         sort(self.service.getProcessDefinitionNames()))
        self.assertEqual(['definition1', 'definition2', 'definition4'],
                         sort(self.service1.getProcessDefinitionNames()))

    def testCreateProcessInstance(self):
        pi = self.service.createProcessInstance('definition1')
        self.assertEqual(pi, 'PI #1')

        pi = self.service1.createProcessInstance('definition4')
        self.assertEqual(pi, 'PI #4')

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(
        WorkflowServiceTests))
    return suite

if __name__ == '__main__':
    unittest.main()


=== Added File Zope3/src/zope/app/workflow/tests/workflowsetup.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Setup for Placeful Worfklow Tests
Revision information:
$Id: workflowsetup.py,v 1.1 2003/05/08 17:27:20 jack-e Exp $
"""
from zope.app.traversing import traverse, getPath
from zope.app.container.zopecontainer import ZopeContainerAdapter
from zope.app.services.service import ServiceManager
from zope.app.services.service import ServiceConfiguration

from zope.component import getService, getServiceManager
from zope.app.services.servicenames import Roles, Permissions, Adapters
from zope.app.services.servicenames import Authentication, Workflows

from zope.app.interfaces.security import IAuthenticationService
from zope.app.interfaces.security import IRoleService
from zope.app.interfaces.security import IPermissionService
from zope.app.security.registries.principalregistry \
     import principalRegistry
from zope.app.security.registries.permissionregistry \
     import permissionRegistry

from zope.app.interfaces.dependable import IDependable
from zope.app.services.tests.placefulsetup \
     import PlacefulSetup
from zope.app.interfaces.annotation import IAnnotatable
from zope.app.interfaces.annotation import IAttributeAnnotatable
from zope.app.attributeannotations import AttributeAnnotations
from zope.app.interfaces.annotation import IAnnotations
from zope.app.interfaces.dependable import IDependable
from zope.app.dependable import Dependable
from zope.app.interfaces.services.configuration \
     import IUseConfiguration, IUseConfigurable
from zope.app.services.configuration import UseConfiguration
from zope.component.adapter import provideAdapter

from zope.app.interfaces.services.configuration \
     import Active, Unregistered, Registered

from zope.app.workflow.service import WorkflowService




class WorkflowServiceForTests(WorkflowService):

    __implements__ = WorkflowService.__implements__, IAttributeAnnotatable




class WorkflowSetup(PlacefulSetup):

    def setUp(self):
        PlacefulSetup.setUp(self)

        self.root_sm = getServiceManager(None)

        provideAdapter(IAttributeAnnotatable,
                       IAnnotations, AttributeAnnotations)
        provideAdapter(IAnnotatable, IDependable, Dependable)
        provideAdapter(IUseConfigurable, IUseConfiguration, UseConfiguration)
        
        # Set up a local workflow service
        self.buildFolders()
        self.rootFolder.setServiceManager(ServiceManager())

        service_name = 'workflow_srv'
        self.sm = traverse(self.rootFolder, '++etc++site')
        self.default = default = traverse(self.sm, 'default')
        default.setObject(service_name, WorkflowServiceForTests())

        # XXX change to unique names 
        self.service = traverse(default, service_name)
        path = "%s/%s" % (getPath(default), service_name)
        configuration = ServiceConfiguration(Workflows, path, self.rootFolder)
        self.cm = default.getConfigurationManager()
        self.cm.setObject('', configuration)
        traverse(self.cm, '1').status = Active

        # Set up a more local workflow service
        folder1 = traverse(self.rootFolder, 'folder1')
        folder1.setServiceManager(ServiceManager())

        service_name1 = 'workflow_srv1'
        self.sm1 = traverse(folder1, '++etc++site')
        self.default1 = default1 = traverse(self.sm1, 'default')
        default1.setObject(service_name1, WorkflowServiceForTests())

        # XXX change to unique name
        self.service1 = traverse(self.default1, service_name1)
        path1 = "%s/%s" % (getPath(default1), service_name1)
        configuration1 = ServiceConfiguration(Workflows, path1, self.rootFolder)
        self.cm1 = default1.getConfigurationManager()
        self.cm1.setObject('', configuration1)
        traverse(self.cm1, '1').status = Active


    def setupAuthService(self):
        self.root_sm.defineService(Authentication, IAuthenticationService)
        self.root_sm.provideService(Authentication, principalRegistry)
        return getService(self.rootFolder, Authentication)


    def setupRoleService(self):
        self.root_sm.defineService(Roles, IRoleService)
        self.root_sm.provideService(Roles, roleRegistry)
        return getService(self.rootFolder, Roles)

    def setupPermissionService(self):
        self.root_sm.defineService(Permissions, IPermissionService)
        self.root_sm.provideService(Permissions, permissionRegistry)
        return getService(self.rootFolder, Permissions)