[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer.py:1.1.2.2 testHTTPServer2.py:NONE

Shane Hathaway shane@digicool.com
Tue, 27 Nov 2001 14:29:21 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv32307

Modified Files:
      Tag: Zope-3x-branch
	testHTTPServer.py 
Removed Files:
      Tag: Zope-3x-branch
	testHTTPServer2.py 
Log Message:
Changed testHTTPServer2.py to testHTTPServer.py.  The new tests have the
same coverage but they depend on the external interface (the HTTP standard)
rather than internals.


=== Zope3/lib/python/Zope/Server/tests/testHTTPServer.py 1.1.2.1 => 1.1.2.2 ===
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
 # FOR A PARTICULAR PURPOSE.
 
-""" Unit tests for Zope.Server.HTTPResponse """
-
 import unittest
-import Zope.Publisher.HTTP.IPayload
-from urlparse import urlparse
+from asyncore import socket_map, poll
+
+from threading import Thread
+from Zope.Server.TaskThreads import ThreadedTaskDispatcher
+from Zope.Server.HTTPServer import http_task, http_channel, http_server
+from Zope.Server.dual_mode_channel import pull_trigger
+from Zope.Server.Adjustments import Adjustments
+
+
+from httplib import HTTPConnection
+
+from time import sleep
+
+import sys
+sys.setcheckinterval(120)
+
+tasks = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+
+my_adj = Adjustments()
+# Reduce overflows to make testing easier.
+my_adj.outbuf_overflow = 10000
+my_adj.inbuf_overflow = 10000
+
+
+class EchoHTTPTask (http_task):
+
+    def execute(self):
+        headers = self.request_data.headers
+        if headers.has_key('CONTENT_LENGTH'):
+            cl = headers['CONTENT_LENGTH']
+            self.response_headers['Content-Length'] = cl
+        instream = self.request_data.getBodyStream()
+        while 1:
+            data = instream.read(8192)
+            if not data:
+                break
+            self.write(data)
+
+
+class EchoHTTPChannel (http_channel):
+
+    task_class = EchoHTTPTask
+
+
+class EchoHTTPServer (http_server):
+
+    channel_class = EchoHTTPChannel
+
+
+
+class Tests(unittest.TestCase):
+
+    def setUp(self):
+        tasks.setThreadCount(4)
+        # Bind to any port on localhost.
+        self.server = EchoHTTPServer(LOCALHOST, 0, tasks=tasks, adj=my_adj)
+        self.port = self.server.socket.getsockname()[1]
+        self.run_loop = 1
+        self.counter = 0
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+        sleep(0.1)  # Give the thread some time to start.
+
+    def tearDown(self):
+        self.run_loop = 0
+        tasks.shutdown()
+        self.server.close()
+        self.thread.join()
+
+    def loop(self):
+        while self.run_loop:
+            self.counter = self.counter + 1
+            #print 'loop', self.counter
+            poll(0.1, socket_map)
+
+    def testEchoResponse(self, h=None, add_headers=None, body=''):
+        if h is None:
+            h = HTTPConnection(LOCALHOST, self.port)
+        h.putrequest('GET', '/')
+        h.putheader('Accept', 'text/plain')
+        if add_headers:
+            for k, v in add_headers.items():
+                h.putheader(k, v)
+        if body:
+            h.putheader('Content-Length', str(int(len(body))))
+        h.endheaders()
+        if body:
+            h.send(body)
+        response = h.getresponse()
+        self.failUnlessEqual(int(response.status), 200)
+        length = int(response.getheader('Content-Length', '0'))
+        response_body = response.read()
+        self.failUnlessEqual(length, len(response_body))
+        self.failUnlessEqual(response_body, body)
+
+    def testMultipleRequestsWithoutBody(self):
+        # Tests the use of multiple requests in a single connection.
+        h = HTTPConnection(LOCALHOST, self.port)
+        for n in range(3):
+            self.testEchoResponse(h)
+        self.testEchoResponse(h, {'Connection': 'close'})
+
+    def testMultipleRequestsWithBody(self):
+        # Tests the use of multiple requests in a single connection.
+        h = HTTPConnection(LOCALHOST, self.port)
+        for n in range(3):
+            self.testEchoResponse(h, body='Hello, world!')
+        self.testEchoResponse(h, {'Connection': 'close'})
+
+    def testLargeBody(self):
+        # Tests the use of multiple requests in a single connection.
+        h = HTTPConnection(LOCALHOST, self.port)
+        s = 'This string has 32 characters.\r\n' * 32  # 1024 characters.
+        self.testEchoResponse(h, body=(s * 1024))  # 1 MB
+        self.testEchoResponse(h, {'Connection': 'close'},
+                            body=(s * 100))  # 100 KB
+
+    def testManyClients(self):
+        conns = []
+        for n in range(50):  # Linux kernel (2.4.8) doesn't like > 128 ?
+            #print 'open', n, clock()
+            h = HTTPConnection(LOCALHOST, self.port)
+            #h.debuglevel = 1
+            h.putrequest('GET', '/')
+            h.putheader('Accept', 'text/plain')
+            h.endheaders()
+            conns.append(h)
+            # If you uncomment the next line, you can raise the
+            # number of connections much higher without running
+            # into delays.
+            #sleep(0.01)
+        responses = []
+        for h in conns:
+            response = h.getresponse()
+            self.failUnlessEqual(response.status, 200)
+            responses.append(response)
+        for response in responses:
+            response.read()
+            
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(Tests)
 
-#
-#   Test shims.
-#
-class InitAny:
-
-    def __init__( self, **kw ):
-        self.__dict__.update( kw )
-    
-class FauxHandler( InitAny ):
-
-    data = request = None
-
-    def continue_request( self, data, request ):
-        self.data = data
-        self.request = request
-
-class FauxRequest( InitAny ):
-
-    collector = DEFAULT_COLLECTOR = []
-    uri = 'http://www.zope.org/Resources;special?id=abc#aargh'
-
-    def split_uri( self ):
-        scheme, nethost, path, parms, query, fragment = urlparse( self.uri )
-        if parms:
-            parms = ';%s' % parms # Compensate for some stupidity
-        if query:
-            query = '?%s' % query # Compensate for some stupidity
-        return path, parms, query, fragment
-
-class FauxChannel( InitAny ):
-
-    terminator = None
-    worked = 0
-
-    def set_terminator( self, value ):
-        self.terminator = value
-
-    def work( self ):
-        self.worked = 1
-
-class FauxServer( InitAny ):
-    pass
-
-class FauxRequestPayload:
-
-    __implements__ = Zope.Publisher.HTTP.IPayload.IRequestPayload
-
-    def processInputs(self, request):
-        """
-        Processes request inputs.
-        """
-        self.body = request.full_instream.read()
-
-    def getPublication(self, request):
-        """
-        Returns the publication object.
-        """
-
-    def debugInfo(self, request):
-        """
-        Returns text containing debugging information.
-        """
-
-
-class FauxResponsePayload:
-
-    __implements__ = Zope.Publisher.HTTP.IPayload.IResponsePayload
-
-    def setBody(self, response, body):
-        """
-        Sets the body of the response.
-        """
-
-    def handleException(self, response, exc_info):
-        """
-        Calls setBody() with an error response.
-        """
-
-#
-#   Test the collector.
-#
-class zhttp_collectorTest( unittest.TestCase ):
-
-    def setUp( self ):
-        self.handler = FauxHandler()
-        self.request = FauxRequest()
-        self.request.channel = self.channel = FauxChannel()
-
-    def tearDown( self ):
-        del self.channel
-        del self.request
-
-    def testZero( self ):
-        """
-            Test building a collector with size = 0
-        """
-        from Zope.Server.HTTPServer import zhttp_collector
-        collector = zhttp_collector( self.handler, self.request, 0 )
-        self.assertEqual( self.channel.terminator, 0 )
-        self.assertEqual( self.request.collector, collector )
-        self.failUnless( self.handler.data is None )
-        self.failUnless( self.handler.request is None )
-
-        collector.found_terminator()
-
-        self.assertEqual( self.handler.data.read(), '' )
-        self.failUnless( self.handler.request is self.request )
-
-    def testSmaller( self ):
-        from Zope.Server.HTTPServer import zhttp_collector
-        collector = zhttp_collector( self.handler, self.request, 15 )
-        self.assertEqual( self.channel.terminator, 15 )
-        self.assertEqual( self.request.collector, collector )
-        self.failUnless( self.handler.data is None )
-        self.failUnless( self.handler.request is None )
-
-        junk = 'x' * 15
-        collector.collect_incoming_data( junk )
-        collector.found_terminator()
-
-        self.assertEqual( self.handler.data.read(), junk )
-        self.failUnless( self.handler.request is self.request )
-
-    def testLarger( self ):
-        from Zope.Server.HTTPServer import zhttp_collector
-        collector = zhttp_collector( self.handler, self.request, 600000 )
-        self.assertEqual( self.channel.terminator, 600000 )
-        self.assertEqual( self.request.collector, collector )
-        self.failUnless( self.handler.data is None )
-        self.failUnless( self.handler.request is None )
-
-        junk = 'x' * 1000
-        for i in range( 600 ):
-            collector.collect_incoming_data( junk )
-        collector.found_terminator()
-
-        stuff = self.handler.data.read()
-        self.assertEqual( stuff, junk*600 )
-        self.failUnless( self.handler.request is self.request )
-
-#
-#   Test the handler.
-#
-class zhttp_handlerTest( unittest.TestCase ):
-
-    def setUp( self ):
-        self.request_payload = FauxRequestPayload()
-        self.response_payload = FauxResponsePayload()
-
-        self.resolver = InitAny( cache={ '123.45.67.89'
-                                       : ( 0, 1, 'my.host.tla' )
-                                       } )
-
-        self.logger = InitAny( resolver=self.resolver )
-
-        self.server = FauxServer( port=9673
-                                , server_name='localhost'
-                                , SERVER_IDENT='XYZZY'
-                                , logger=self.logger
-                                )
-
-        self.channel = FauxChannel( server=self.server
-                                  , creation_time='2001/01/01 00:00:00 GMT'
-                                  , addr=('123.45.67.89', )
-                                  , queue=[]
-                                  )
-
-        self.request = FauxRequest( channel=self.channel
-                                  , command='GET'
-                                  , version='3.14'
-                                  , header= ( 'Foo: foo', 'Bar-baz: bar-baz' )
-                                  )
-
-    def testEmpty( self ):
-
-        from Zope.Server.HTTPServer import zhttp_handler
-
-        handler = zhttp_handler( self.request_payload, self.response_payload )
-        self.failUnless( handler.request_payload is self.request_payload )
-        self.failUnless( handler.response_payload is self.response_payload )
-        self.failIf( handler.env_override )
-        self.failIf( handler.hits )
-
-        self.failUnless( handler.match( FauxRequest( uri = '/' ) ) )
-        self.failUnless( handler.match( FauxRequest( uri = '/abc' ) ) )
-        self.failUnless( handler.match( FauxRequest( uri = '/abc/def' ) ) )
-        self.failIf( handler.match( FauxRequest( uri = '#' ) ) )
-
-    def testInit( self ):
-
-        from Zope.Server.HTTPServer import zhttp_handler
-        override = { 'foo' : 1, 'bar' : 2 }
-
-        handler = zhttp_handler( self.request_payload
-                               , self.response_payload
-                               , 'abc'
-                               , override
-                               )
-        self.assertEqual( handler.env_override, override )
-        self.failIf( handler.hits )
-
-        self.failIf( handler.match( FauxRequest( uri = '/' ) ) )
-        self.failUnless( handler.match( FauxRequest( uri = '/abc' ) ) )
-        self.failUnless( handler.match( FauxRequest( uri = '/abc/def' ) ) )
-        self.failIf( handler.match( FauxRequest( uri = '#' ) ) )
-
-    def testGetEnvironment( self ):
-
-        from Zope.Server.HTTPServer import zhttp_handler
-
-        #
-        #   Test default URL, w/ no uri-base
-        #
-        req = self.request
-        req.command = 'sneer'
-
-        handler = zhttp_handler( self.request_payload, self.response_payload )
-        env = handler.get_environment( req )
-
-        self.assertEqual( env['REQUEST_METHOD'], 'SNEER' )
-        self.assertEqual( int( env['SERVER_PORT'] ), 9673 )
-        self.assertEqual( env['SERVER_NAME'], 'localhost' )
-        self.assertEqual( env['SERVER_SOFTWARE'], 'XYZZY' )
-        self.assertEqual( env['SERVER_PROTOCOL'], "HTTP/3.14" )
-        self.assertEqual( env['channel.creation_time']
-                        , '2001/01/01 00:00:00 GMT' )
-
-        self.assertEqual( env['SCRIPT_NAME'], '' )
-        self.assertEqual( env['PATH_INFO'], '/Resources;special' )
-        self.assertEqual( env['QUERY_STRING'], 'id=abc' )
-        self.assertEqual( env['GATEWAY_INTERFACE'], 'CGI/1.1' )
-        self.assertEqual( env['REMOTE_ADDR'], '123.45.67.89' )
-        self.assertEqual( env['REMOTE_HOST'], 'my.host.tla' )
-        self.assertEqual( env['HTTP_FOO'], 'foo' )
-        self.assertEqual( env['HTTP_BAR_BAZ'], 'bar-baz' )
-
-        #
-        #   Test with a different URL, and a uri-base.
-        #
-        req = FauxRequest( channel=self.channel
-                         , command='glower'
-                         , version='3.14'
-                         , header= ()
-                         , uri='http://www.zope.org/hmm/WikiCentral/DevWiki'
-                         )
-
-        handler = zhttp_handler( self.request_payload
-                               , self.response_payload
-                               , uri_base='hmm'
-                               )
-        env = handler.get_environment( req )
-
-        self.assertEqual( env['SCRIPT_NAME'], '/hmm' )
-        self.assertEqual( env['PATH_INFO'], '/WikiCentral/DevWiki' )
-        self.failIf( env.get( 'QUERY_STRING', None ) is not None )
-        self.failIf( env.get( 'HTTP_FOO', None ) is not None )
-        self.failIf( env.get( 'HTTP_BAR_BAZ', None ) is not None )
-
-    def testHandleNoLength( self ):
-
-        from Zope.Server.HTTPServer import zhttp_handler
-
-        self.channel.current_request = req = self.request
-
-        handler = zhttp_handler( self.request_payload, self.response_payload )
-
-        self.failIf( handler.hits.as_long() )
-        self.failIf( self.channel.worked )
-
-        handler.handle_request( req )
-
-        self.assertEqual( handler.hits.as_long(), 1 )
-        self.assertEqual( self.channel.current_request, None )
-        self.assertEqual( len( self.channel.queue ), 1 )
-        self.failUnless( self.channel.worked )
-
-        httpreq = self.channel.queue[0]
-        self.assertEqual( httpreq[ 'REQUEST_METHOD' ], 'GET' )
-        self.assertEqual( int( httpreq['SERVER_PORT'] ), 9673 )
-        self.assertEqual( httpreq['SERVER_NAME'], 'localhost' )
-        self.assertEqual( httpreq['SERVER_SOFTWARE'], 'XYZZY' )
-        self.assertEqual( httpreq['SERVER_PROTOCOL'], "HTTP/3.14" )
-        self.assertEqual( httpreq['SCRIPT_NAME'], '' )
-        self.assertEqual( httpreq['PATH_INFO'], '/Resources;special' )
-        self.assertEqual( httpreq['QUERY_STRING'], 'id=abc' )
-        self.assertEqual( httpreq['GATEWAY_INTERFACE'], 'CGI/1.1' )
-        self.assertEqual( httpreq['REMOTE_ADDR'], '123.45.67.89' )
-        self.assertEqual( httpreq['REMOTE_HOST'], 'my.host.tla' )
-        self.assertEqual( httpreq['HTTP_FOO'], 'foo' )
-        self.assertEqual( httpreq['HTTP_BAR_BAZ'], 'bar-baz' )
-
-        self.assertEqual( httpreq.response._http_version, req.version )
-        self.assertEqual( httpreq.response._server_version, 'XYZZY' )
-
-    def testHandleWithLength( self ):
-
-        from Zope.Server.HTTPServer import zhttp_handler
-
-        self.channel.current_request = req = self.request
-        req.header = ( 'Content-length: 100', )
-
-        handler = zhttp_handler( self.request_payload, self.response_payload )
-
-        self.failIf( handler.hits.as_long() )
-        self.failIf( self.channel.worked )
-
-        handler.handle_request( req )
-
-        self.assertEqual( handler.hits.as_long(), 1 )
-        self.assertEqual( self.channel.current_request, req )
-        self.assertEqual( len( self.channel.queue ), 0 )
-        self.failIf( self.channel.worked )
-
-        collector = getattr( req, 'collector', None )
-        self.failUnless( collector is not None )
-
-        junk = 'x' * 100
-        collector.collect_incoming_data( junk )
-        collector.found_terminator()
-
-        self.assertEqual( self.channel.current_request, None )
-        self.assertEqual( len( self.channel.queue ), 1 )
-        self.failUnless( self.channel.worked )
-
-        httpreq = self.channel.queue[0]
-        httpreq.processInputs()
-        self.assertEqual( self.request_payload.body, junk )
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )

=== Removed File Zope3/lib/python/Zope/Server/tests/testHTTPServer2.py ===