[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - chunking.py:1.1.2.1 HTTPServer2.py:1.1.2.3 PublisherServers.py:1.1.2.2 dual_mode_channel.py:1.1.2.3

Shane Hathaway shane@digicool.com
Mon, 26 Nov 2001 11:16:08 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv23433

Modified Files:
      Tag: Zope-3x-branch
	HTTPServer2.py PublisherServers.py dual_mode_channel.py 
Added Files:
      Tag: Zope-3x-branch
	chunking.py 
Log Message:
Closer to full integration with Publisher.


=== Added File Zope3/lib/python/Zope/Server/chunking.py ===


class ChunkedReceiver:

    chunk_remainder = 0
    control_line = ''
    all_chunks_received = 0
    trailer = ''
    finished = 0

    # max_control_line = 1024
    # max_trailer = 65536


    def __init__(self, buf):
        self.buf = buf

    def received(self, s):
        # Returns the number of bytes consumed.
        if self.finished:
            return 0
        orig_size = len(s)
        while s:
            rm = self.chunk_remainder
            if rm > 0:
                # Receive the remainder of a chunk.
                to_write = s[:rm]
                self.buf_append(to_write)
                written = len(to_write)
                s = s[written:]
                self.chunk_remainder -= written
            elif not self.all_chunks_received:
                # Receive a control line.
                s = self.control_line + s
                pos = s.find('\r\n')
                if pos < 0:
                    # Control line not finished.
                    self.control_line = s
                    s = ''
                else:
                    # Control line finished.
                    line = s[:pos]
                    s = s[pos + 2:]
                    self.control_line = ''
                    if line:
                        # Begin a new chunk.
                        semi = line.find(';')
                        if semi >= 0:
                            # discard extension info.
                            line = line[:semi]
                        sz = int(line)
                        if sz > 0:
                            # Start a new chunk.
                            self.chunk_remainder = sz
                        else:
                            # Finished chunks.
                            self.all_chunks_received = 1
                    # else expect a control line.
            else:
                # Receive the trailer.
                s = self.trailer + s
                if s[:2] == '\r\n':
                    # No trailer.
                    self.finished = 1
                    return orig_size - (len(s) - 2)
                pos = s.find('\r\n\r\n')
                if pos >= 0:
                    # Finished the trailer.
                    self.finished = 1
                    self.trailer = s[:pos + 2]
                    return orig_size - (len(s) - (pos + 4))
        return orig_size


    def getfile(self):
        return self.buf.getfile()



=== Zope3/lib/python/Zope/Server/HTTPServer2.py 1.1.2.2 => 1.1.2.3 ===
 # FOR A PARTICULAR PURPOSE.
 
-# This server uses asyncore to accept connections and do initial
-# processing but threads to do work.
+"""
+This server uses asyncore to accept connections and do initial
+processing but threads to do work.
+"""
 
 SIMULT_MODE = 0
 
@@ -78,7 +80,6 @@
         """
         try:
             try:
-                self.init()
                 self.execute()
                 self.finish()
             finally:
@@ -156,10 +157,6 @@
         res = '%s\r\n\r\n' % '\r\n'.join(lines)
         return res
 
-    def init(self):
-        """
-        """
-
     def execute(self):
         """
         Override this.
@@ -191,9 +188,9 @@
 
     completed = 0
 
-    def __init__(self, cl):
+    def __init__(self, cl, buf):
         self.remain = cl
-        self.buf = OverflowableBuffer(525000)  # TODO: make configurable
+        self.buf = buf
 
     def received(self, data):
         rm = self.remain
@@ -217,17 +214,21 @@
 
 class http_request_data:
 
-    completed = 0
-    bad = 0
+    completed = 0  # Set once request is completed.
+    empty = 0        # Set if no request was made.
     header_plus = ''
     chunked = 0
     content_length = 0
     body_rcv = None
-    # Other attributes: first_line, header, headers, command, uri, version
+    # Other attributes: first_line, header, headers, command, uri, version,
+    # path, query, fragment
+
+    # headers is a mapping containing keys translated to uppercase
+    # with dashes turned into underscores.
 
     def received(self, data):
         """
-        Receives the HTTP stream.
+        Receives the HTTP stream for one request.
         Returns the number of bytes consumed.
         """
         if self.completed:
@@ -246,17 +247,12 @@
                 # Remove preceeding blank lines.
                 header_plus = header_plus.lstrip()
                 if not header_plus:
-                    # No request was made.
-                    self.handle_error('No request')
-                    return datalen
+                    self.empty = 1
+                    self.completed = 1
                 else:
-                    try:
-                        self.parse_header(header_plus)
-                        if self.body_rcv is None:
-                            self.completed = 1
-                    except:
-                        self.handle_error()
-                        return datalen
+                    self.parse_header(header_plus)
+                    if self.body_rcv is None:
+                        self.completed = 1
                 return consumed
             else:
                 # Header not finished yet.
@@ -264,21 +260,11 @@
                 return datalen
         else:
             # In body.
-            try:
-                consumed = br.received(data)
-            except:
-                self.handle_error()
-                return datalen
+            consumed = br.received(data)
             if br.completed:
                 self.completed = 1
             return consumed
 
-    def handle_error(self, msg=None):
-        # TODO: generate a response saying bad header?
-        import traceback
-        traceback.print_exc()
-        self.bad = 1
-        self.completed = 1
 
     def parse_header(self, header_plus):
         index = header_plus.find('\r\n')
@@ -308,17 +294,22 @@
             uri = unquote(uri)
         self.uri = str(uri)
         self.version = version
+        self.split_uri()
 
         if version == '1.1':
             te = headers.get('TRANSFER_ENCODING', '')
             if te == 'chunked':
+                from chunking import ChunkedReceiver
                 self.chunked = 1
-                self.body_rcv = ChunkedReceiver()
+                buf = OverflowableBuffer(525000)  # TODO: make configurable
+                self.body_rcv = ChunkedReceiver(buf)
         if not self.chunked:
             cl = int(headers.get('CONTENT_LENGTH', 0))
             self.content_length = cl
             if cl > 0:
-                self.body_rcv = StreamedReceiver(cl)
+                buf = OverflowableBuffer(525000)  # TODO: make configurable
+                self.body_rcv = StreamedReceiver(cl, buf)
+
 
     def get_header_lines(self):
         """
@@ -348,8 +339,32 @@
         else:
             return None, None, None
 
+    path_regex = re.compile (
+    #     path    query     fragment
+        r'([^?#]*)(\?[^#]*)?(#.*)?'
+        )
+    
+    def split_uri(self):
+        m = self.path_regex.match (self.uri)
+        if m.end() != len(self.uri):
+            raise ValueError, "Broken URI"
+        else:
+            self.path, query, self.fragment = m.groups()
+            if query:
+                query = query[1:]
+            self.query = query
+
+    def getBodyStream(self):
+        body_rcv = self.body_rcv
+        if body_rcv is not None:
+            return body_rcv.getfile()
+        else:
+            return StringIO('')
+
+
 
-tasklock = allocate_lock()
+# Synchronize access to request queues.
+request_queue_lock = allocate_lock()
 
 
 class http_channel (channel_type):
@@ -359,13 +374,21 @@
     active_channels = {}  # Class-specific channel counter
     proto_request = None
     ready_requests = None  # A list
-    task_running = 0
+    wedged = 0
+
 
     def add_channel(self, map=None):
+        """
+        Keeps track of opened HTTP channels.
+        """
         channel_type.add_channel(self, map)
         self.active_channels[self._fileno] = self
 
+
     def del_channel(self, map=None):
+        """
+        Keeps track of opened HTTP channels.
+        """
         channel_type.del_channel(self, map)
         ac = self.active_channels
         fd = self._fileno
@@ -373,15 +396,31 @@
             del ac[fd]
         # print 'active HTTP channels:', len(ac)
 
+
     def received(self, data):
+        """
+        Receives input asynchronously and launches or queues requests.
+        """
+        if self.wedged:
+            # Ignore input after a bad request.
+            return
         preq = self.proto_request
         while data:
             if preq is None:
                 preq = http_request_data()
-            n = preq.received(data)
+            try:
+                n = preq.received(data)
+            except:
+                # Bad header format or request.  Can't accept more requests.
+                # TODO: use logging.
+                import traceback
+                traceback.print_exc()
+                self.wedged = 1
+                return
             if preq.completed:
-                # Ready.
-                self.queue_request(preq)
+                # The request is ready to use.
+                if not preq.empty:
+                    self.queue_request(preq)
                 preq = None
                 self.proto_request = None
             else:
@@ -390,43 +429,54 @@
                 break
             data = data[n:]
 
+
     def queue_request(self, req):
         rr = self.ready_requests
+        do_now = 1
         if rr is None:
             # First request--no need to lock.
             self.ready_requests = []
         else:
-            tasklock.acquire()
+            request_queue_lock.acquire()
             try:
                 if rr:
                     # The request will be executed when the current
                     # task is finished.
                     rr.append(req)
-                    return
+                    do_now = 0
                 # else no task is running.
             finally:
-                tasklock.release()
-        self.set_sync()
-        self.start_task(req)
+                request_queue_lock.release()
+        if do_now:
+            self.set_sync()
+            self.create_task(req)
+
 
-    def start_task(self, req):
+    def create_task(self, req):
         task = self.task_class(self, req)
         self.server.addTask(task)
 
+
     def end_task(self, close):
         if close:
             self.close_when_done()
         else:
-            tasklock.acquire()
+            new_task = 0
+            req = None
+            request_queue_lock.acquire()
             try:
                 rr = self.ready_requests
                 if rr:
                     req = rr.pop(0)
-                    self.start_task(req)
-                else:
-                    self.set_async()
+                    new_task = 1
             finally:
-                tasklock.release()
+                request_queue_lock.release()
+            if new_task:
+                # Respond to the next request.
+                self.create_task(req)
+            else:
+                # Wait for another request on this connection.
+                self.set_async()
             
 
 
@@ -434,14 +484,29 @@
 
     channel_class = http_channel
 
+    SERVER_IDENT = 'Zope.Server.HTTPServer.http_server'
+
     def __init__(self, ip, port, backlog=1024, tasks=None):
         # Assumes sock is already bound.
         asyncore.dispatcher.__init__(self)
+        self.port = port
         self.tasks = tasks
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
         self.set_reuse_addr()
         self.bind((ip, port))
+
+        host, port = self.socket.getsockname()
+        if not ip:
+            self.log_info('Computing default hostname', 'info')
+            ip = socket.gethostbyname (socket.gethostname())
+        try:
+            self.server_name = socket.gethostbyaddr (ip)[0]
+        except socket.error:
+            self.log_info('Cannot do reverse lookup', 'info')
+            self.server_name = ip       # use the IP address as the "hostname"
+        
         self.listen(backlog)
+
 
     def writable (self):
         return 0


=== Zope3/lib/python/Zope/Server/PublisherServers.py 1.1.2.1 => 1.1.2.2 ===
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
+# FOR A PARTICULAR PURPOSE.
 
 
+from os import path as ospath
+
 from HTTPServer2 import http_task, http_channel, http_server
 
 from Zope.Publisher.Publish import publish
 from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
 from Zope.Publisher.HTTP.HTTPResponse import HTTPResponse
 
+rename_headers = {
+    'CONTENT_LENGTH' : 'CONTENT_LENGTH',
+    'CONTENT_TYPE'   : 'CONTENT_TYPE',
+    'CONNECTION'     : 'CONNECTION_TYPE',
+    }
+
 
 class PublisherHTTPTask (http_task):
 
     def execute(self):
         server = self.channel.server
+        env = self.create_environment()
+        instream = self.request_data.getBodyStream()
         resp = HTTPResponse(server.response_payload, self, self)
-        req = HTTPRequest(server.request_payload, self.instream,
-                          self.request_headers, resp)
+        req = HTTPRequest(server.request_payload, instream,
+                          env, resp)
         publish(req)
+
+    def create_environment(self):
+        request_data = self.request_data
+        path = request_data.path
+        channel = self.channel
+        server = channel.server
+
+        while path and path[0] == '/':
+            path = path[1:]
+        # already unquoted!
+        # if '%' in path:
+        #     path = unquote(path)
+
+        env = {}
+        env['REQUEST_METHOD'] = request_data.command.upper()
+        env['SERVER_PORT'] = str(server.port)
+        env['SERVER_NAME'] = server.server_name
+        env['SERVER_SOFTWARE'] = server.SERVER_IDENT
+        env['SERVER_PROTOCOL'] = "HTTP/%s" % self.version
+        env['channel.creation_time'] = channel.creation_time
+##        if self.uri_base=='/':
+        env['SCRIPT_NAME']=''
+        env['PATH_INFO']='/' + path
+##        else:
+##            env['SCRIPT_NAME'] = self.uri_base
+##            try:
+##                path_info = '/'.split(self.uri_base[1:],1)[1]
+##            except:
+##                path_info=''
+##            env['PATH_INFO']=path_info
+        #env['PATH_TRANSLATED'] = ospath.normpath(ospath.join(
+        #        workdir, env['PATH_INFO']))
+        query = request_data.query
+        if query:
+            env['QUERY_STRING'] = query
+        env['GATEWAY_INTERFACE'] = 'CGI/1.1'
+        env['REMOTE_ADDR'] = channel.addr[0]
+
+        # If we're using a resolving logger, try to get the
+        # remote host from the resolver's cache.
+##        if hasattr(server.logger, 'resolver'):
+##            dns_cache=server.logger.resolver.cache
+##            if dns_cache.has_key(env['REMOTE_ADDR']):
+##                remote_host=dns_cache[env['REMOTE_ADDR']][2]
+##                if remote_host is not None:
+##                    env['REMOTE_HOST']=remote_host
+
+        env_has = env.has_key
+        
+        for key, value in request_data.headers.items():
+            value = value.strip()
+            mykey = rename_headers.get(key, None)
+            if mykey is None:
+                mykey = 'HTTP_%s' % key
+            if not env_has(mykey):
+                env[mykey] = value
+        return env
+
 
 
 


=== Zope3/lib/python/Zope/Server/dual_mode_channel.py 1.1.2.2 => 1.1.2.3 ===
     # Create a tempfile if the pending output data gets larger
     # than outbuf_overflow.
-    outbuf_overflow = 4100000  # About 4 MB
+    outbuf_overflow = 1050000  # A little over 1 MB
 
     # will_close is set to 1 to close the socket.
     will_close = 0
@@ -114,16 +114,13 @@
             raise t
         asyncore.dispatcher.handle_error(self)
 
-    def handle_comm_error(self, msg=None):
+    def handle_comm_error(self):
         """
         Designed for handling communication errors that occur
         during asynchronous transfers *only*.  Probably should log
         this, but in a different place.
         """
-        print '--- communication error ---'
-        import traceback
-        traceback.print_exc()
-        self.close()
+        self.handle_error()
 
     def set_sync(self):
         self.async_mode = 0
@@ -135,11 +132,12 @@
     def sync_write(self, data):
         if data:
             self.outbuf.append(data)
-        if len(self.outbuf) >= self.send_bytes:
-            while self._flush_some():
-                # Send what we can without blocking.
-                # We propogate errors to the application on purpose.
-                pass
+        while len(self.outbuf) >= self.send_bytes:
+            # Send what we can without blocking.
+            # We propogate errors to the application on purpose
+            # (to prevent unnecessary work).
+            if not self._flush_some():
+                break
         return len(data)
 
     def sync_flush(self):