[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/tests/functional.py - refactor cookie handling in the functional test framework

Fred L. Drake, Jr. fdrake at gmail.com
Tue Jan 11 12:24:49 EST 2005


Log message for revision 28779:
  - refactor cookie handling in the functional test framework
  - add support for cookies in functional doctests
  

Changed:
  U   Zope3/trunk/src/zope/app/tests/functional.py

-=-
Modified: Zope3/trunk/src/zope/app/tests/functional.py
===================================================================
--- Zope3/trunk/src/zope/app/tests/functional.py	2005-01-11 14:00:30 UTC (rev 28778)
+++ Zope3/trunk/src/zope/app/tests/functional.py	2005-01-11 17:24:49 UTC (rev 28779)
@@ -143,7 +143,7 @@
             # strang FunctionalTestSetup is.  Later, when we
             # have time, we should clean up this (perhaps with an
             # event) and clean up FunctionalTestSetup.
-            response = http(grant_request, handle_errors=False)
+            response = HTTPCaller()(grant_request, handle_errors=False)
             FunctionalTestSetup().connection = None
             
         elif config_file and config_file != self._config_file:
@@ -205,17 +205,38 @@
     def abort(self):
         abort()
 
-class BrowserTestCase(FunctionalTestCase):
-    """Functional test case for Browser requests."""
 
-    def setUp(self):
-        super(BrowserTestCase, self).setUp()
+class CookieHandler(object):
+
+    def __init__(self, *args, **kw):
         # Somewhere to store cookies between consecutive requests
         self.cookies = SimpleCookie()
+        super(CookieHandler, self).__init__(*args, **kw)
 
+    def httpCookie(self, path):
+        """Return self.cookies as an HTTP_COOKIE environment value."""
+        l = [m.OutputString() for m in self.cookies.values()
+             if path.startswith(m['path'])]
+        return '; '.join(l)
+
+    def loadCookies(self, envstring):
+        self.cookies.load(envstring)
+
+    def saveCookies(self, response):
+        """Save cookies from the response."""
+        # Urgh - need to play with the response's privates to extract
+        # cookies that have been set
+        for k,v in response._cookies.items():
+            k = k.encode('utf8')
+            self.cookies[k] = v['value'].encode('utf8')
+            if self.cookies[k].has_key('Path'):
+                self.cookies[k]['Path'] = v['Path']
+
+
+class BrowserTestCase(CookieHandler, FunctionalTestCase):
+    """Functional test case for Browser requests."""
+
     def tearDown(self):
-        del self.cookies
-
         self.setSite(None)
         super(BrowserTestCase, self).tearDown()
 
@@ -246,7 +267,7 @@
             outstream = HTTPTaskStub()
         environment = {"HTTP_HOST": 'localhost',
                        "HTTP_REFERER": 'localhost',
-                       "HTTP_COOKIE": self.__http_cookie(path)}
+                       "HTTP_COOKIE": self.httpCookie(path)}
         environment.update(env)
         app = FunctionalTestSetup().getApplication()
         request = app._request(path, '', outstream,
@@ -256,12 +277,6 @@
         zope.interface.directlyProvides(request, _getDefaultSkin())
         return request
 
-    def __http_cookie(self, path):
-        '''Return self.cookies as an HTTP_COOKIE environment format string'''
-        l = [m.OutputString() for m in self.cookies.values()
-                if path.startswith(m['path'])]
-        return '; '.join(l)
-
     def publish(self, path, basic=None, form=None, env={},
                 handle_errors=False):
         """Renders an object at a given location.
@@ -284,22 +299,16 @@
         # We pull it apart and reassemble the header to block cookies
         # with invalid paths going through, which may or may not be correct
         if env.has_key('HTTP_COOKIE'):
-            self.cookies.load(env['HTTP_COOKIE'])
+            self.loadCookies(env['HTTP_COOKIE'])
             del env['HTTP_COOKIE'] # Added again in makeRequest
 
         request = self.makeRequest(path, basic=basic, form=form, env=env,
                                    outstream=outstream)
         response = ResponseWrapper(request.response, outstream, path)
         if env.has_key('HTTP_COOKIE'):
-            self.cookies.load(env['HTTP_COOKIE'])
+            self.loadCookies(env['HTTP_COOKIE'])
         publish(request, handle_errors=handle_errors)
-        # Urgh - need to play with the response's privates to extract
-        # cookies that have been set
-        for k,v in response._cookies.items():
-            k = k.encode('utf8')
-            self.cookies[k] = v['value'].encode('utf8')
-            if self.cookies[k].has_key('Path'):
-                self.cookies[k]['Path'] = v['Path']
+        self.saveCookies(response)
         self.setSite(old_site)
         return response
 
@@ -474,82 +483,7 @@
     def getBody(self):
         return self.getOutput()
 
-def http(request_string, handle_errors=True):
-    """Execute an HTTP request string via the publisher
 
-    This is used for HTTP doc tests.
-    """
-    # Commit work done by previous python code.
-    commit()
-
-    # Discard leading white space to make call layout simpler
-    request_string = request_string.lstrip()
-
-    # split off and parse the command line
-    l = request_string.find('\n')
-    command_line = request_string[:l].rstrip()
-    request_string = request_string[l+1:]
-    method, path, protocol = command_line.split()
-    path = urllib.unquote(path)
-    
-
-    instream = StringIO(request_string)
-    environment = {"HTTP_HOST": 'localhost',
-                   "HTTP_REFERER": 'localhost',
-                   "REQUEST_METHOD": method,
-                   "SERVER_PROTOCOL": protocol,
-                   }
-
-    headers = [split_header(header)
-               for header in rfc822.Message(instream).headers]
-    for name, value in headers:
-        name = ('_'.join(name.upper().split('-')))
-        if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
-            name = 'HTTP_' + name
-        environment[name] = value.rstrip()
-
-    auth_key = 'HTTP_AUTHORIZATION'
-    if environment.has_key(auth_key):
-        environment[auth_key] = auth_header(environment[auth_key])
-
-    outstream = HTTPTaskStub()
-
-
-    old_site = getSite()
-    setSite(None)
-    app = FunctionalTestSetup().getApplication()
-    header_output = HTTPHeaderOutput(
-        protocol, ('x-content-type-warning', 'x-powered-by'))
-
-    if method in ('GET', 'POST', 'HEAD'):
-        if (method == 'POST' and
-            environment.get('CONTENT_TYPE', '').startswith('text/xml')
-            ):
-            request_cls = XMLRPCRequest
-            publication_cls = XMLRPCPublication
-        else:
-            request_cls = type(BrowserRequest.__name__, (BrowserRequest,), {})
-            zope.interface.classImplements(request_cls, _getDefaultSkin())
-            publication_cls = BrowserPublication
-    else:
-        request_cls = HTTPRequest
-        publication_cls = HTTPPublication
-    
-    request = app._request(path, instream, outstream,
-                           environment=environment,
-                           request=request_cls, publication=publication_cls)
-    request.response.setHeaderOutput(header_output)
-    response = DocResponseWrapper(request.response, outstream, path,
-                                  header_output)
-    
-    publish(request, handle_errors=handle_errors)
-    setSite(old_site)
-
-    # sync Python connection:
-    getRootFolder()._p_jar.sync()
-    
-    return response
-
 headerre = re.compile('(\S+): (.+)$')
 def split_header(header):
     return headerre.match(header).group(1, 2)
@@ -603,9 +537,88 @@
     suite.addTest(unittest.makeSuite(SampleFunctionalTest))
     return suite
 
+
+class HTTPCaller(CookieHandler):
+    """Execute an HTTP request string via the publisher"""
+
+    def __call__(self, request_string, handle_errors=True):
+        # Commit work done by previous python code.
+        commit()
+
+        # Discard leading white space to make call layout simpler
+        request_string = request_string.lstrip()
+
+        # split off and parse the command line
+        l = request_string.find('\n')
+        command_line = request_string[:l].rstrip()
+        request_string = request_string[l+1:]
+        method, path, protocol = command_line.split()
+        path = urllib.unquote(path)
+
+        instream = StringIO(request_string)
+        environment = {"HTTP_COOKIE": self.httpCookie(path),
+                       "HTTP_HOST": 'localhost',
+                       "HTTP_REFERER": 'localhost',
+                       "REQUEST_METHOD": method,
+                       "SERVER_PROTOCOL": protocol,
+                       }
+
+        headers = [split_header(header)
+                   for header in rfc822.Message(instream).headers]
+        for name, value in headers:
+            name = ('_'.join(name.upper().split('-')))
+            if name not in ('CONTENT_TYPE', 'CONTENT_LENGTH'):
+                name = 'HTTP_' + name
+            environment[name] = value.rstrip()
+
+        auth_key = 'HTTP_AUTHORIZATION'
+        if environment.has_key(auth_key):
+            environment[auth_key] = auth_header(environment[auth_key])
+
+        outstream = HTTPTaskStub()
+
+        old_site = getSite()
+        setSite(None)
+        app = FunctionalTestSetup().getApplication()
+        header_output = HTTPHeaderOutput(
+            protocol, ('x-content-type-warning', 'x-powered-by'))
+
+        if method in ('GET', 'POST', 'HEAD'):
+            if (method == 'POST' and
+                environment.get('CONTENT_TYPE', '').startswith('text/xml')
+                ):
+                request_cls = XMLRPCRequest
+                publication_cls = XMLRPCPublication
+            else:
+                request_cls = type(BrowserRequest.__name__,
+                                   (BrowserRequest,),
+                                   {})
+                zope.interface.classImplements(request_cls, _getDefaultSkin())
+                publication_cls = BrowserPublication
+        else:
+            request_cls = HTTPRequest
+            publication_cls = HTTPPublication
+
+        request = app._request(
+            path, instream, outstream,
+            environment=environment,
+            request=request_cls, publication=publication_cls)
+        request.response.setHeaderOutput(header_output)
+        response = DocResponseWrapper(
+            request.response, outstream, path, header_output)
+
+        publish(request, handle_errors=handle_errors)
+        self.saveCookies(response)
+        setSite(old_site)
+
+        # sync Python connection:
+        getRootFolder()._p_jar.sync()
+
+        return response
+
 def FunctionalDocFileSuite(*paths, **kw):
     globs = kw.setdefault('globs', {})
-    globs['http'] = http
+    globs['http'] = HTTPCaller()
     globs['getRootFolder'] = getRootFolder
     globs['sync'] = sync
 



More information about the Zope3-Checkins mailing list