[Zope-Checkins] SVN: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/ Coverage for ZPublisher.WSGIPublisher.publish.

Tres Seaver tseaver at palladion.com
Mon May 31 14:26:04 EDT 2010


Log message for revision 112875:
  Coverage for ZPublisher.WSGIPublisher.publish.

Changed:
  U   Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
  U   Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py

-=-
Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py	2010-05-31 17:27:45 UTC (rev 112874)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/WSGIPublisher.py	2010-05-31 18:26:04 UTC (rev 112875)
@@ -56,6 +56,9 @@
     # HTTP/1.1 should use chunked encoding
     http_chunk = 0
 
+    # Append any "cleanup" functions to this list.
+    after_list = ()
+
     def finalize(self):
 
         headers = self.headers
@@ -144,7 +147,12 @@
         #        return ''
         raise NotImplementedError
 
-def publish(request, module_name, after_list, debug=0):
+def publish(request, module_name,
+            _get_module_info=None,  # only for testing
+           ):
+    if _get_module_info is None:
+        _get_module_info = get_module_info
+
     (bobo_before,
      bobo_after,
      object,
@@ -153,12 +161,13 @@
      err_hook,
      validated_hook,
      transactions_manager,
-    )= get_module_info(module_name)
+    )= _get_module_info(module_name)
 
     request.processInputs()
     response = request.response
 
-    after_list[0] = bobo_after
+    if bobo_after is not None:
+        response.after_list += (bobo_after,)
 
     if debug_mode:
         response.debug_mode = debug_mode
@@ -197,7 +206,6 @@
 
 def publish_module(environ, start_response):
     status = 200
-    after_list = [None]
     stdout = StringIO()
     stderr = StringIO()
     response = WSGIResponse(stdout=stdout, stderr=stderr)
@@ -208,13 +216,9 @@
     request = HTTPRequest(environ['wsgi.input'], environ, response)
     if ISkinnable.providedBy(request):
         setDefaultSkin(request)
-        
-    # Let's support post-mortem debugging
-    handle_errors = environ.get('wsgi.handleErrors', True)
 
     try:
-        response = publish(request, 'Zope2', after_list=[None],
-                        debug=handle_errors)
+        response = publish(request, 'Zope2')
     except Unauthorized, v:
         pass
     except Redirect, v:
@@ -241,8 +245,8 @@
 
     stdout.close()
 
-    if after_list[0] is not None:
-        after_list[0]()
+    for callable in response.after_list:
+        callable()
 
     # Return the result body iterable.
     return result

Modified: Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py
===================================================================
--- Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py	2010-05-31 17:27:45 UTC (rev 112874)
+++ Zope/branches/tseaver-fix_wsgi/src/ZPublisher/tests/test_WSGIPublisher.py	2010-05-31 18:26:04 UTC (rev 112875)
@@ -154,6 +154,71 @@
         self.assertRaises(NotImplementedError, lambda: str(response))
 
 
+class Test_publish(unittest.TestCase):
+
+    def _callFUT(self, request, module_name, _get_module_info=None):
+        from ZPublisher.WSGIPublisher import publish
+        if _get_module_info is None:
+            return publish(request, module_name)
+
+        return publish(request, module_name, _get_module_info)
+
+    def test_invalid_module_doesnt_catch_error(self):
+        _gmi = DummyCallable()
+        _gmi._raise = ImportError('testing')
+        self.assertRaises(ImportError, self._callFUT, None, 'nonesuch', _gmi)
+        self.assertEqual(_gmi._called_with, (('nonesuch',), {}))
+
+    def test_wo_REMOTE_USER(self):
+        request = DummyRequest(PATH_INFO='/')
+        response = request.response = DummyResponse()
+        _before = DummyCallable()
+        _after = object()
+        _object = DummyCallable()
+        _object._result = 'RESULT'
+        request._traverse_to = _object
+        _realm = 'TESTING'
+        _debug_mode = True
+        _err_hook = DummyCallable()
+        _validated_hook = object()
+        _tm = DummyTM()
+        _gmi = DummyCallable()
+        _gmi._result = (_before, _after, _object, _realm, _debug_mode,
+                        _err_hook, _validated_hook, _tm)
+        returned = self._callFUT(request, 'okmodule', _gmi)
+        self.failUnless(returned is response)
+        self.assertEqual(_gmi._called_with, (('okmodule',), {}))
+        self.failUnless(request._processedInputs)
+        self.assertEqual(response.after_list, (_after,))
+        self.failUnless(response.debug_mode)
+        self.assertEqual(response.realm, 'TESTING')
+        self.assertEqual(_before._called_with, ((), {}))
+        self.assertEqual(request['PARENTS'], [_object])
+        self.assertEqual(request._traversed, ('/', None, _validated_hook))
+        self.assertEqual(_tm._recorded, (_object, request))
+        self.assertEqual(_object._called_with, ((), {}))
+        self.assertEqual(response._body, 'RESULT')
+        self.assertEqual(_err_hook._called_with, None)
+
+    def test_w_REMOTE_USER(self):
+        request = DummyRequest(PATH_INFO='/', REMOTE_USER='phred')
+        response = request.response = DummyResponse()
+        _before = DummyCallable()
+        _after = object()
+        _object = DummyCallable()
+        _object._result = 'RESULT'
+        request._traverse_to = _object
+        _realm = 'TESTING'
+        _debug_mode = True
+        _err_hook = DummyCallable()
+        _validated_hook = object()
+        _tm = DummyTM()
+        _gmi = DummyCallable()
+        _gmi._result = (_before, _after, _object, _realm, _debug_mode,
+                        _err_hook, _validated_hook, _tm)
+        self._callFUT(request, 'okmodule', _gmi)
+        self.assertEqual(response.realm, None)
+
 class Test_publish_module(unittest.TestCase):
     
     def setUp(self):
@@ -223,12 +288,50 @@
                          ('', 'foobar'))
     
 
+class DummyRequest(dict):
+    _processedInputs = False
+    _traversed = None
+    _traverse_to = None
+    args = ()
+
+    def processInputs(self):
+        self._processedInputs = True
+
+    def traverse(self, path, response=None, validated_hook=None):
+        self._traversed = (path, response, validated_hook)
+        return self._traverse_to
+
+class DummyResponse(object):
+    debug_mode = False
+    after_list = ()
+    realm = None
+    _body = None
+
+    def setBody(self, body):
+        self._body = body
+
+class DummyCallable(object):
+    _called_with = _raise = _result = None
+
+    def __call__(self, *args, **kw):
+        self._called_with = (args, kw)
+        if self._raise:
+            raise self._raise
+        return self._result
+
+class DummyTM(object):
+    _recorded = _raise = _result = None
+
+    def recordMetaData(self, *args):
+        self._recorded = args
+
 def noopStartResponse(status, headers):
     pass
 
 
 def test_suite():
     return unittest.TestSuite((
-        unittest.makeSuite(WSGIResponseTests, 'test'),
-        unittest.makeSuite(Test_publish_module, 'test'),
+        unittest.makeSuite(WSGIResponseTests),
+        unittest.makeSuite(Test_publish),
+        unittest.makeSuite(Test_publish_module),
     ))



More information about the Zope-Checkins mailing list