[Zope-Checkins] SVN: Zope/trunk/lib/python/OFS/ Refactor unrestrictedTraverse

Casey Duncan casey at zope.com
Wed Jun 2 17:34:36 EDT 2004


Log message for revision 25205:
Refactor unrestrictedTraverse
- Reformat code, add docstrings, unobfuscate local var names
- Eliminate use of hasattr and simplify where possible
- Add comments
- Improve test coverage



-=-
Modified: Zope/trunk/lib/python/OFS/Traversable.py
===================================================================
--- Zope/trunk/lib/python/OFS/Traversable.py	2004-06-02 21:13:57 UTC (rev 25204)
+++ Zope/trunk/lib/python/OFS/Traversable.py	2004-06-02 21:34:36 UTC (rev 25205)
@@ -23,7 +23,7 @@
 
 NotFound = 'NotFound'
 
-_marker=[]
+_marker = object()
 
 class Traversable:
 
@@ -106,100 +106,130 @@
 
     unrestrictedTraverse__roles__=() # Private
     def unrestrictedTraverse(self, path, default=_marker, restricted=0):
+        """Lookup an object by path,
+        
+        path -- The path to the object. May be a sequence of strings or a slash
+        separated string. If the path begins with an empty path element
+        (i.e., an empty string or a slash) then the lookup is performed
+        from the application root. Otherwise, the lookup is relative to
+        self. Two dots (..) as a path element indicates an upward traversal
+        to the acquisition parent.
+        
+        default -- If provided, this is the value returned if the path cannot
+        be traversed for any reason (i.e., no object exists at that path or
+        the object is inaccessible).
+        
+        restricted -- If false (default) then no security checking is performed.
+        If true, then all of the objects along the path are validated with
+        the security machinery. Usually invoked using restrictedTraverse().
+        """
+        if not path:
+            return self
+        
+        _getattr = getattr
+        _hasattr = hasattr
+        _none = None
+        marker = _marker
 
-        if not path: return self
+        if isinstance(path, str):
+            # Unicode paths are not allowed
+            path = path.split('/')
+        else: 
+            path = list(path)
 
-        get=getattr
-        has=hasattr
-        N=None
-        M=_marker
-
-        if isinstance(path, str): path = path.split('/')
-        else: path=list(path)
-
-        REQUEST={'TraversalRequestNameStack': path}
+        REQUEST = {'TraversalRequestNameStack': path}
         path.reverse()
-        pop=path.pop
+        path_pop=path.pop
 
         if len(path) > 1 and not path[0]:
             # Remove trailing slash
             path.pop(0)
 
-        if restricted: securityManager=getSecurityManager()
-        else: securityManager=N
+        if restricted: 
+            securityManager = getSecurityManager()
+        else: 
+            securityManager = _none
 
         if not path[-1]:
             # If the path starts with an empty string, go to the root first.
-            pop()
-            self=self.getPhysicalRoot()
-            if (restricted and not securityManager.validate(
-                None, None, None, self)):
+            path_pop()
+            self = self.getPhysicalRoot()
+            if (restricted 
+                and not securityManager.validate(None, None, None, self)):
                 raise Unauthorized, name
 
         try:
-            object = self
+            obj = self
             while path:
-                name=pop()
+                name = path_pop()
                 __traceback_info__ = path, name
 
                 if name[0] == '_':
                     # Never allowed in a URL.
                     raise NotFound, name
 
-                if name=='..':
-                    o=getattr(object, 'aq_parent', M)
-                    if o is not M:
-                        if (restricted and not securityManager.validate(
-                            object, object,name, o)):
+                if name == '..':
+                    next = aq_parent(obj)
+                    if next is not _none:
+                        if restricted and not securityManager.validate(
+                            obj, obj,name, next):
                             raise Unauthorized, name
-                        object=o
+                        obj = next
                         continue
 
-                t=get(object, '__bobo_traverse__', N)
-                if t is not N:
-                    o=t(REQUEST, name)
-
+                bobo_traverse = _getattr(obj, '__bobo_traverse__', _none)
+                if bobo_traverse is not _none:
+                    next = bobo_traverse(REQUEST, name)
                     if restricted:
-                        container = N
-                        if aq_base(o) is not o:
+                        if aq_base(next) is not next:
                             # The object is wrapped, so the acquisition
-                            # context determines the container.
-                            container = aq_parent(aq_inner(o))
-                        elif has(o, 'im_self'):
-                            container = o.im_self
-                        elif (has(get(object, 'aq_base', object), name)
-                              and get(object, name) == o):
-                            container = object
-                        if (not securityManager.validate(object,
-                                                         container, name, o)):
+                            # context is the container.
+                            container = aq_parent(aq_inner(next))
+                        elif _getattr(next, 'im_self', _none) is not _none:
+                            # Bound method, the bound instance
+                            # is the container
+                            container = next.im_self
+                        elif _getattr(aq_base(obj), name, marker) == next:
+                            # Unwrapped direct attribute of the object so
+                            # object is the container
+                            container = obj
+                        else:
+                            # Can't determine container
+                            container = _none
+                        if not securityManager.validate(
+                            obj, container, name, next):
                             raise Unauthorized, name
-
                 else:
                     if restricted:
-                        o = guarded_getattr(object, name, M)
+                        next = guarded_getattr(obj, name, marker)
                     else:
-                        o = get(object, name, M)
-                    if o is M:
+                        next = _getattr(obj, name, marker)
+                    if next is marker:
                         try:
-                            o=object[name]
+                            next=obj[name]
                         except AttributeError:
                             # Raise NotFound for easier debugging
+                            # instead of AttributeError: __getitem__
                             raise NotFound, name
-                        if (restricted and not securityManager.validate(
-                            object, object, N, o)):
+                        if restricted and not securityManager.validate(
+                            obj, obj, _none, next):
                             raise Unauthorized, name
 
-                object=o
+                obj = next
 
-            return object
+            return obj
 
-        except ConflictError: raise
+        except ConflictError:
+            raise
         except:
-            if default==_marker: raise
-            return default
+            if default is not marker:
+                return default
+            else:
+                raise
 
     restrictedTraverse__roles__=None # Public
     def restrictedTraverse(self, path, default=_marker):
+        # Trusted code traversal code, always enforces security
         return self.unrestrictedTraverse(path, default, restricted=1)
 
 def path2url(path):

Modified: Zope/trunk/lib/python/OFS/tests/testTraverse.py
===================================================================
--- Zope/trunk/lib/python/OFS/tests/testTraverse.py	2004-06-02 21:13:57 UTC (rev 25204)
+++ Zope/trunk/lib/python/OFS/tests/testTraverse.py	2004-06-02 21:34:36 UTC (rev 25205)
@@ -84,8 +84,20 @@
     def __bobo_traverse__(self, request, name):
         if name == 'bb_subitem':
             return BoboTraversable().__of__(self)
+        elif name == 'bb_method':
+            return self.bb_method
+        elif name == 'bb_status':
+            return self.bb_status
+        elif name == 'manufactured':
+            return 42
         else:
             raise KeyError
+    
+    def bb_method(self):
+        """Test Method"""
+        pass
+        
+    bb_status = 'screechy'
 
 
 def makeConnection():
@@ -152,13 +164,17 @@
 
     def testTraversePath( self ):
         self.failUnless( 'file' in self.folder1.objectIds() )
-        self.failUnless( self.folder1.unrestrictedTraverse( ('', 'folder1', 'file' ) ))
-        self.failUnless( self.folder1.unrestrictedTraverse( ('', 'folder1' ) ))
+        self.failUnless( 
+            self.folder1.unrestrictedTraverse( ('', 'folder1', 'file' ) ))
+        self.failUnless( 
+            self.folder1.unrestrictedTraverse( ('', 'folder1' ) ))
 
     def testTraverseURLNoSlash( self ):
         self.failUnless( 'file' in self.folder1.objectIds() )
-        self.failUnless( self.folder1.unrestrictedTraverse( '/folder1/file' ))
-        self.failUnless( self.folder1.unrestrictedTraverse( '/folder1' ))
+        self.failUnless( 
+            self.folder1.unrestrictedTraverse( '/folder1/file' ))
+        self.failUnless( 
+            self.folder1.unrestrictedTraverse( '/folder1' ))
 
     def testTraverseURLSlash( self ):
         self.failUnless( 'file' in self.folder1.objectIds() )
@@ -166,11 +182,15 @@
         self.failUnless( self.folder1.unrestrictedTraverse( '/folder1/' ))
 
     def testTraverseToNone( self ):
-        self.failUnlessRaises( KeyError, self.folder1.unrestrictedTraverse, ('', 'folder1', 'file2' ) )
-        self.failUnlessRaises( KeyError, self.folder1.unrestrictedTraverse,  '/folder1/file2' )
-        self.failUnlessRaises( KeyError, self.folder1.unrestrictedTraverse,  '/folder1/file2/' )
+        self.failUnlessRaises( 
+            KeyError, 
+            self.folder1.unrestrictedTraverse, ('', 'folder1', 'file2' ) )
+        self.failUnlessRaises( 
+            KeyError, self.folder1.unrestrictedTraverse,  '/folder1/file2' )
+        self.failUnlessRaises( 
+            KeyError, self.folder1.unrestrictedTraverse,  '/folder1/file2/' )
 
-    def testTraverseThroughBoboTraverse(self):
+    def testBoboTraverseToWrappedSubObj(self):
         # Verify it's possible to use __bobo_traverse__ with the
         # Zope security policy.
         noSecurityManager()
@@ -179,6 +199,33 @@
         self.failUnlessRaises(KeyError, bb.restrictedTraverse, 'notfound')
         bb.restrictedTraverse('bb_subitem')
 
+    def testBoboTraverseToMethod(self):
+        # Verify it's possible to use __bobo_traverse__ to a method.
+        noSecurityManager()
+        SecurityManager.setSecurityPolicy( self.oldPolicy )
+        bb = BoboTraversable()
+        self.failUnless(
+            bb.restrictedTraverse('bb_method') is not bb.bb_method)
+
+    def testBoboTraverseToSimpleAttrValue(self):
+        # Verify it's possible to use __bobo_traverse__ to a simple
+        # python value
+        noSecurityManager()
+        SecurityManager.setSecurityPolicy( self.oldPolicy )
+        bb = BoboTraversable()
+        self.assertEqual(bb.restrictedTraverse('bb_status'), 'screechy')
+
+    def testBoboTraverseToNonAttrValue(self):
+        # Verify it's possible to use __bobo_traverse__ to an
+        # arbitrary manufactured object
+        noSecurityManager()
+        # Default security policy always seems to deny in this case, which
+        # is fine, but to test the code branch we sub in the forgiving one
+        SecurityManager.setSecurityPolicy(UnitTestSecurityPolicy())
+        bb = BoboTraversable()
+        self.failUnless(
+            bb.restrictedTraverse('manufactured') is 42)
+
     def testAcquiredAttributeDenial(self):
         # Verify that restrictedTraverse raises the right kind of exception
         # on denial of access to an acquired attribute.  If it raises
@@ -190,6 +237,24 @@
         self.root.stuff = 'stuff here'
         self.failUnlessRaises(Unauthorized,
                               self.root.folder1.restrictedTraverse, 'stuff')
+    
+    def testDefaultValueWhenUnathorized(self):
+        # Test that traversing to an unauthorized object returns
+        # the default when provided
+        noSecurityManager()
+        SecurityManager.setSecurityPolicy(CruelSecurityPolicy())
+        newSecurityManager( None, UnitTestUser().__of__( self.root ) )
+        self.root.stuff = 'stuff here'
+        self.assertEqual(
+            self.root.folder1.restrictedTraverse('stuff', 42), 42)
+    
+    def testDefaultValueWhenNotFound(self):
+        # Test that traversing to a non-existent object returns
+        # the default when provided
+        noSecurityManager()
+        SecurityManager.setSecurityPolicy( self.oldPolicy )
+        self.assertEqual(
+            self.root.restrictedTraverse('happy/happy', 'joy'), 'joy')
 
 
 def test_suite():




More information about the Zope-Checkins mailing list