[Zope-CVS] CVS: Products/PluggableAuthService/plugins - ChallengeProtocolChooser.py:1.1.2.1 RequestTypeSniffer.py:1.1.2.1

Sidnei da Silva sidnei at enfoldsystems.com
Sat Aug 13 00:34:34 EDT 2005


Update of /cvs-repository/Products/PluggableAuthService/plugins
In directory cvs.zope.org:/tmp/cvs-serv7291/plugins

Added Files:
      Tag: sidnei-challenge-protocol-chooser
	ChallengeProtocolChooser.py RequestTypeSniffer.py 
Log Message:

- Implement Challenge Protocol Chooser, with functional doctests


=== Added File Products/PluggableAuthService/plugins/ChallengeProtocolChooser.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: ChallengeProtocolChooser

$Id: ChallengeProtocolChooser.py,v 1.1.2.1 2005/08/13 04:34:33 sidnei Exp $
"""

from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from BTrees.OOBTree import OOBTree
from Globals import InitializeClass

from Products.PageTemplates.PageTemplateFile import PageTemplateFile

from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
    import IRequestTypeSniffer
from Products.PluggableAuthService.interfaces.plugins \
    import IChallengeProtocolChooser
from Products.PluggableAuthService.interfaces.plugins \
     import IChallengePlugin

from Products.PluggableAuthService.interfaces.request \
    import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest

_request_types = ()
_request_type_bmap = {}

def registerRequestType(label, iface):
    global _request_types
    registry = list(_request_types)
    registry.append((label, iface))
    _request_types = tuple(registry)
    _request_type_bmap[iface] = label

def listRequestTypesLabels():
    return _request_type_bmap.values()

manage_addChallengeProtocolChooserForm = PageTemplateFile(
    'www/cpcAdd', globals(), __name__='manage_addChallengeProtocolChooserForm' )

def addChallengeProtocolChooserPlugin( dispatcher, id, title=None,
                                       mapping=None, REQUEST=None ):
    """ Add a ChallengeProtocolChooserPlugin to a Pluggable Auth Service. """

    cpc = ChallengeProtocolChooser(id, title=title, mapping=mapping)
    dispatcher._setObject(cpc.getId(), cpc)

    if REQUEST is not None:
        REQUEST['RESPONSE'].redirect(
                                '%s/manage_workspace'
                                '?manage_tabs_message='
                                'ChallengeProtocolChooser+added.'
                            % dispatcher.absolute_url())


class ChallengeProtocolChooser( BasePlugin ):

    """ PAS plugin for choosing challenger protocol based on request
    """
    __implements__ = ( IChallengeProtocolChooser, )

    meta_type = 'Challenge Protocol Chooser Plugin'

    security = ClassSecurityInfo()

    manage_options = (({'label': 'Mapping', 
                        'action': 'manage_editProtocolMapping'
                        },
                       )
                      + BasePlugin.manage_options
                      )

    def __init__(self, id, title=None, mapping=None):
        self._id = self.id = id
        self.title = title
        self._map = OOBTree()
        if mapping is not None:
            self.manage_updateProtocolMapping(mapping=mapping)

    security.declarePrivate('chooseProtocol')
    def chooseProtocols(self, request):
        pas_instance = self._getPAS()
        plugins = pas_instance._getOb('plugins')

        sniffers = plugins.listPlugins( IRequestTypeSniffer )

        for sniffer_id, sniffer in sniffers:
            request_type = sniffer.sniffRequestType(request)
            if request_type is not None:
                return self._getProtocolsFor(request_type)

    def _getProtocolsFor(self, request_type):
        label = _request_type_bmap.get(request_type, None)
        if label is None:
            return
        return self._map.get(label, None)


    def _listProtocols(self):
        pas_instance = self._getPAS()
        plugins = pas_instance._getOb('plugins')

        challengers = plugins.listPlugins( IChallengePlugin )
        found = []

        for challenger_id, challenger in challengers:
            protocol = getattr(challenger, 'protocol', challenger_id)
            if protocol not in found:
                found.append(protocol)

        return found

    manage_editProtocolMappingForm = PageTemplateFile(
        'www/cpcEdit', globals(),
        __name__='manage_editProtocolMappingForm')

    def manage_editProtocolMapping(self, REQUEST=None):
        """ Edit Protocol Mapping
        """
        info = []
        available_protocols = self._listProtocols()

        request_types = listRequestTypesLabels()
        request_types.sort()
        
        for label in request_types:
            settings = []
            select_any = False
            info.append(
                {'label': label,
                 'settings': settings
                 })
            protocols = self._map.get(label, None)
            if not protocols:
                select_any = True
            for protocol in available_protocols:
                selected = False
                if protocols and protocol in protocols:
                    selected = True
                settings.append({'label': protocol,
                                 'selected': selected,
                                 'value': protocol,
                                 })

            settings.insert(0, {'label': '(any)',
                                'selected': select_any,
                                'value': '',
                                })
        return self.manage_editProtocolMappingForm(info=info, REQUEST=REQUEST)

    def manage_updateProtocolMapping(self, mapping, REQUEST=None):
        """ Update mapping of Request Type to Protocols
        """
        for key, value in mapping.items():
            value = filter(None, value)
            if not value:
                if self._map.has_key(key):
                    del self._map[key]
            else:
                self._map[key] = value

        if REQUEST is not None:
            REQUEST['RESPONSE'].redirect(
                '%s/manage_editProtocolMapping'
                '?manage_tabs_message='
                'Protocol+Mappings+Changed.'
                % self.absolute_url())

for label, iface in (('Browser', IBrowserRequest),
                     ('WebDAV', IWebDAVRequest),
                     ('FTP', IFTPRequest),
                     ('XML-RPC', IXMLRPCRequest)):
    registerRequestType(label, iface)


=== Added File Products/PluggableAuthService/plugins/RequestTypeSniffer.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights
# Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this
# distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" Classes: RequestTypeSniffer

$Id: RequestTypeSniffer.py,v 1.1.2.1 2005/08/13 04:34:33 sidnei Exp $
"""

from Acquisition import aq_parent
from AccessControl import ClassSecurityInfo
from Globals import InitializeClass
from ZServer.FTPRequest import FTPRequest
from ZPublisher import xmlrpc

from Products.PageTemplates.PageTemplateFile import PageTemplateFile

from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins \
    import IRequestTypeSniffer

from Products.PluggableAuthService.interfaces.request \
    import IBrowserRequest, IWebDAVRequest, IFTPRequest, IXMLRPCRequest

_sniffers = ()

def registerSniffer(iface, func):
    global _sniffers
    registry = list(_sniffers)
    registry.append((iface, func))
    _sniffers = tuple(registry)

manage_addRequestTypeSnifferForm = PageTemplateFile(
    'www/rtsAdd', globals(), __name__='manage_addRequestTypeSnifferForm' )

def addRequestTypeSnifferPlugin( dispatcher, id, title=None, REQUEST=None ):
    """ Add a RequestTypeSnifferPlugin to a Pluggable Auth Service. """

    rts = RequestTypeSniffer(id, title)
    dispatcher._setObject(rts.getId(), rts)

    if REQUEST is not None:
        REQUEST['RESPONSE'].redirect(
                                '%s/manage_workspace'
                                '?manage_tabs_message='
                                'RequestTypeSniffer+added.'
                            % dispatcher.absolute_url())


class RequestTypeSniffer( BasePlugin ):

    """ PAS plugin for detecting a Request's type
    """
    __implements__ = ( IRequestTypeSniffer, )

    meta_type = 'Request Type Sniffer Plugin'

    security = ClassSecurityInfo()

    def __init__(self, id, title=None):

        self._id = self.id = id
        self.title = title

    security.declarePrivate('sniffRequestType')
    def sniffRequestType(self, request):
        found = None
        for iface, func in _sniffers:
            if func( request ):
                found = iface

        if found is not None:
            return found

# Most of the sniffing code below has been inspired by
# similar tests found in BaseRequest, HTTPRequest and ZServer
def webdavSniffer(request):
    dav_src = request.get('WEBDAV_SOURCE_PORT', None)
    method = request.get('REQUEST_METHOD', 'GET').upper()
    path_info = request.get('PATH_INFO', '')

    if dav_src:
        return True

    if method not in ('GET', 'POST'):
        return True

    if method in ('GET',) and path_info.endswith('manage_DAVget'):
        return True

registerSniffer(IWebDAVRequest, webdavSniffer)

def xmlrpcSniffer(request):
    response = request['RESPONSE']
    method = request.get('REQUEST_METHOD', 'GET').upper()

    if method in ('GET', 'POST') and isinstance(response, xmlrpc.Response):
        return True

registerSniffer(IXMLRPCRequest, xmlrpcSniffer)

def ftpSniffer(request):
    if isinstance(request, FTPRequest):
        return True

registerSniffer(IFTPRequest, ftpSniffer)
    
def browserSniffer(request):
    # If it's none of the above, it's very likely a browser request.
    for sniffer in (webdavSniffer, ftpSniffer, xmlrpcSniffer):
        if sniffer(request):
            return False
    return True

registerSniffer(IBrowserRequest, browserSniffer)



More information about the Zope-CVS mailing list