[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - Buffers.py:1.1.2.1 Chunking.py:1.1.2.1 DualModeChannel.py:1.1.2.1 Utilities.py:1.1.2.1 HTTPServer.py:1.1.2.15 chunking.py:NONE dual_mode_channel.py:NONE

Shane Hathaway shane@cvs.zope.org
Thu, 31 Jan 2002 11:33:47 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv14226

Modified Files:
      Tag: Zope-3x-branch
	HTTPServer.py 
Added Files:
      Tag: Zope-3x-branch
	Buffers.py Chunking.py DualModeChannel.py Utilities.py 
Removed Files:
      Tag: Zope-3x-branch
	chunking.py dual_mode_channel.py 
Log Message:
- Parse newlines, rather than requiring CRLFs, in the HTTP server.

- Fixed chunking to use hexadecimal.

- Renamed modules and classes to fit with Zope 3 naming conventions.

- Put buffer classes in their own module.

- Provided a way to insert TCPWatch in the HTTP server tests.



=== Added File Zope3/lib/python/Zope/Server/Buffers.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.


try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO


# copy_bytes controls the size of temp. strings for shuffling data around.
COPY_BYTES = 1 << 18  # 256K

# The maximum number of bytes to buffer in a simple string.
STRBUF_LIMIT = 8192


class FileBasedBuffer:

    remain = 0

    def __init__(self, file, from_buffer=None):
        self.file = file
        if from_buffer is not None:
            from_file = from_buffer.getfile()
            read_pos = from_file.tell()
            from_file.seek(0)
            while 1:
                data = from_file.read(COPY_BYTES)
                if not data:
                    break
                file.write(data)
            self.remain = int(file.tell() - read_pos)
            from_file.seek(read_pos)
            file.seek(read_pos)

    def __len__(self):
        return self.remain

    def append(self, s):
        file = self.file
        read_pos = file.tell()
        file.seek(0, 2)
        file.write(s)
        file.seek(read_pos)
        self.remain = self.remain + len(s)

    def get(self, bytes=-1, skip=0):
        file = self.file
        if not skip:
            read_pos = file.tell()
        if bytes < 0:
            # Read all
            res = file.read()
        else:
            res = file.read(bytes)
        if skip:
            self.remain -= len(res)
        else:
            file.seek(read_pos)
        return res

    def skip(self, bytes, allow_prune=0):
        if self.remain < bytes:
            raise ValueError, (
                "Can't skip %d bytes in buffer of %d bytes" %
                (bytes, self.remain))
        self.file.seek(bytes, 1)
        self.remain = self.remain - bytes

    def newfile(self):
        raise 'NotImplemented'

    def prune(self):
        file = self.file
        if self.remain == 0:
            read_pos = file.tell()
            file.seek(0, 2)
            sz = file.tell()
            file.seek(read_pos)
            if sz == 0:
                # Nothing to prune.
                return
        nf = self.newfile()
        while 1:
            data = file.read(COPY_BYTES)
            if not data:
                break
            nf.write(data)
        self.file = nf

    def getfile(self):
        return self.file



class TempfileBasedBuffer(FileBasedBuffer):

    def __init__(self, from_buffer=None):
        FileBasedBuffer.__init__(self, self.newfile(), from_buffer)

    def newfile(self):
        from tempfile import TemporaryFile
        return TemporaryFile('w+b')



class StringIOBasedBuffer(FileBasedBuffer):

    def __init__(self, from_buffer=None):
        if from_buffer is not None:
            FileBasedBuffer.__init__(self, StringIO(), from_buffer)
        else:
            # Shortcut. :-)
            self.file = StringIO()

    def newfile(self):
        return StringIO()



class OverflowableBuffer:
    """
    This buffer implementation has four stages:
    - No data
    - String-based buffer
    - StringIO-based buffer
    - Temporary file storage
    The first two stages are fastest for simple transfers.
    """

    overflowed = 0
    buf = None
    strbuf = ''  # String-based buffer.

    def __init__(self, overflow):
        # overflow is the maximum to be stored in a StringIO buffer.
        self.overflow = overflow

    def __len__(self):
        buf = self.buf
        if buf is not None:
            return len(buf)
        else:
            return len(self.strbuf)

    def _create_buffer(self):
        # print 'creating buffer'
        strbuf = self.strbuf
        if len(strbuf) >= self.overflow:
            self._set_large_buffer()
        else:
            self._set_small_buffer()
        buf = self.buf
        if strbuf:
            buf.append(self.strbuf)
            self.strbuf = ''
        return buf

    def _set_small_buffer(self):
        self.buf = StringIOBasedBuffer(self.buf)
        self.overflowed = 0

    def _set_large_buffer(self):
        self.buf = TempfileBasedBuffer(self.buf)
        self.overflowed = 1

    def append(self, s):
        buf = self.buf
        if buf is None:
            strbuf = self.strbuf
            if len(strbuf) + len(s) < STRBUF_LIMIT:
                self.strbuf = strbuf + s
                return
            buf = self._create_buffer()
        buf.append(s)
        sz = len(buf)
        if not self.overflowed:
            if sz >= self.overflow:
                self._set_large_buffer()

    def get(self, bytes=-1, skip=0):
        buf = self.buf
        if buf is None:
            strbuf = self.strbuf
            if not skip:
                return strbuf
            buf = self._create_buffer()
        return buf.get(bytes, skip)

    def skip(self, bytes, allow_prune=0):
        buf = self.buf
        if buf is None:
            strbuf = self.strbuf
            if allow_prune and bytes == len(strbuf):
                # We could slice instead of converting to
                # a buffer, but that would eat up memory in
                # large transfers.
                self.strbuf = ''
                return
            buf = self._create_buffer()
        buf.skip(bytes, allow_prune)

    def prune(self):
        """
        A potentially expensive operation that removes all data
        already retrieved from the buffer.
        """
        buf = self.buf
        if buf is None:
            self.strbuf = ''
            return
        buf.prune()
        if self.overflowed:
            sz = len(buf)
            if sz < self.overflow:
                # Revert to a faster buffer.
                self._set_small_buffer()

    def getfile(self):
        buf = self.buf
        if buf is None:
            buf = self._create_buffer()
        return buf.getfile()


=== Added File Zope3/lib/python/Zope/Server/Chunking.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.


from Utilities import find_double_newline


class ChunkedReceiver:

    chunk_remainder = 0
    control_line = ''
    all_chunks_received = 0
    trailer = ''
    completed = 0

    # max_control_line = 1024
    # max_trailer = 65536


    def __init__(self, buf):
        self.buf = buf

    def received(self, s):
        # Returns the number of bytes consumed.
        if self.completed:
            return 0
        orig_size = len(s)
        while s:
            rm = self.chunk_remainder
            if rm > 0:
                # Receive the remainder of a chunk.
                to_write = s[:rm]
                self.buf.append(to_write)
                written = len(to_write)
                s = s[written:]
                self.chunk_remainder -= written
            elif not self.all_chunks_received:
                # Receive a control line.
                s = self.control_line + s
                pos = s.find('\n')
                if pos < 0:
                    # Control line not finished.
                    self.control_line = s
                    s = ''
                else:
                    # Control line finished.
                    line = s[:pos]
                    s = s[pos + 1:]
                    self.control_line = ''
                    line = line.strip()
                    if line:
                        # Begin a new chunk.
                        semi = line.find(';')
                        if semi >= 0:
                            # discard extension info.
                            line = line[:semi]
                        sz = int(line.strip(), 16)  # hexadecimal
                        if sz > 0:
                            # Start a new chunk.
                            self.chunk_remainder = sz
                        else:
                            # Finished chunks.
                            self.all_chunks_received = 1
                    # else expect a control line.
            else:
                # Receive the trailer.
                trailer = self.trailer + s
                if trailer[:2] == '\r\n':
                    # No trailer.
                    self.completed = 1
                    return orig_size - (len(trailer) - 2)
                elif trailer[:1] == '\n':
                    # No trailer.
                    self.completed = 1
                    return orig_size - (len(trailer) - 1)
                pos = find_double_newline(trailer)
                if pos < 0:
                    # Trailer not finished.
                    self.trailer = trailer
                    s = ''
                else:
                    # Finished the trailer.
                    self.completed = 1
                    self.trailer = trailer[:pos]
                    return orig_size - (len(trailer) - pos)
        return orig_size


    def getfile(self):
        return self.buf.getfile()



=== Added File Zope3/lib/python/Zope/Server/DualModeChannel.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.

import asyncore
import socket
from time import time
from UserDict import UserDict

from medusa.thread import select_trigger
from Adjustments import default_adj
from Buffers import OverflowableBuffer


# Create the main trigger if it doesn't exist yet.
if select_trigger.the_trigger is None:
    select_trigger.the_trigger = select_trigger.trigger()


class AlternateSocketMapMixin:
    """Mixin for asyncore.dispatcher to more easily support
    alternate socket maps"""

    socket_map = None

    def add_channel(self, map=None):
        if map is None:
            map = self.socket_map
        asyncore.dispatcher.add_channel(self, map)

    def del_channel(self, map=None):
        if map is None:
            map = self.socket_map
        asyncore.dispatcher.del_channel(self, map)

    def pull_trigger(self):
        pull_trigger = getattr(self.socket_map, 'pull_trigger', None)
        if pull_trigger is not None:
            # Use the trigger from the socket map.
            pull_trigger()
        else:
            select_trigger.the_trigger.pull_trigger()


class ASMTrigger (AlternateSocketMapMixin, select_trigger.trigger):
    """Trigger for an alternate socket map"""

    def __init__(self, socket_map):
        self.socket_map = socket_map
        select_trigger.trigger.__init__(self)

    pull_trigger = select_trigger.trigger.pull_trigger


class SocketMapWithTrigger (UserDict):

    def __init__(self):
        UserDict.__init__(self)
        self.pull_trigger = ASMTrigger(self).pull_trigger


class DualModeChannel (AlternateSocketMapMixin, asyncore.dispatcher):
    """
    The channel switches between asynchronous and synchronous mode.
    """

    # will_close is set to 1 to close the socket.
    will_close = 0

    # boolean: async or sync mode
    async_mode = 1

    def __init__(self, server, conn, addr, adj=None, socket_map=None):
        self.server = server
        self.addr = addr
        if adj is None:
            adj = default_adj
        self.adj = adj
        self.socket_map = socket_map
        self.outbuf = OverflowableBuffer(adj.outbuf_overflow)
        self.creation_time = time()
        asyncore.dispatcher.__init__(self, conn)

    def get_sync_streams(self):
        return synchronous_streams(self)

    #
    # ASYNCHRONOUS METHODS
    #

    def handle_close(self):
        self.close()

    def writable(self):
        if not self.async_mode:
            return 0
        return self.will_close or self.outbuf

    def handle_write(self):
        if not self.async_mode:
            return
        self.inner_handle_write()

    def inner_handle_write(self):
        if self.outbuf:
            try:
                self._flush_some()
            except socket.error:
                self.handle_comm_error()
        elif self.will_close:
            self.close()

    def readable(self):
        if not self.async_mode:
            return 0
        return not self.will_close

    def handle_read(self):
        if not self.async_mode:
            return
        self.inner_handle_read()

    def inner_handle_read(self):
        try:
            data = self.recv(self.adj.recv_bytes)
        except socket.error:
            self.handle_comm_error()
            return
        self.received(data)

    def received(self, data):
        """
        Override to receive data in async mode.
        """
        pass

    def handle_comm_error(self):
        """
        Designed for handling communication errors that occur
        during asynchronous operations *only*.  Probably should log
        this, but in a different place.
        """
        self.handle_error()

    def set_sync(self):
        self.async_mode = 0

    #
    # SYNCHRONOUS METHODS
    #

    def sync_write(self, data):
        if data:
            self.outbuf.append(data)
        while len(self.outbuf) >= self.adj.send_bytes:
            # Send what we can without blocking.
            # We propogate errors to the application on purpose
            # (to prevent unnecessary work).
            if not self._flush_some():
                break

    def sync_flush(self):
        """
        Pauses the application while outbuf is flushed.
        Normally not a good thing to do.
        """
        blocked = 0
        try:
            while self.outbuf:
                # We propogate errors to the application on purpose.
                if not blocked:
                    self.socket.setblocking(1)
                    blocked = 1
                self._flush_some()
        finally:
            if blocked:
                self.socket.setblocking(0)

    def set_async(self):
        self.async_mode = 1
        self.pull_trigger()

    #
    # METHODS USED IN BOTH MODES
    #

    def _flush_some(self):
        outbuf = self.outbuf
        if outbuf:
            chunk = outbuf.get(self.adj.send_bytes)
            num_sent = self.send(chunk)
            if num_sent:
                outbuf.skip(num_sent, 1)
                return 1
        return 0

    def close_when_done(self):
        if self.async_mode:
            self.will_close = 1
            self.pull_trigger()
        else:
            # We might be able close immediately.
            while self._flush_some():
                pass
            if not self.outbuf:
                # Quick exit.
                self.close()
            else:
                # Wait until outbuf is flushed.
                self.will_close = 1
                self.async_mode = 1
                self.pull_trigger()


allocate_lock = None


class SimultaneousModeChannel (DualModeChannel):
    """
    The channel operates in synchronous mode with an asynchronous
    helper.  The asynchronous callbacks empty the output buffer
    and fill the input buffer.
    """

    def __init__(self, server, conn, addr, adj=None, socket_map=None):
        global allocate_lock
        if allocate_lock is None:
            from thread import allocate_lock

        writelock = allocate_lock()
        self._writelock_acquire = writelock.acquire
        self._writelock_release = writelock.release
        self._writelock_locked = writelock.locked
        DualModeChannel.__init__(self, server, conn, addr, adj, socket_map)

    #
    # ASYNCHRONOUS METHODS
    #

    def writable(self):
        return self.will_close or (
            self.outbuf and not self._writelock_locked())

    def handle_write(self):
        if not self._writelock_acquire(0):
            # A synchronous method is writing.
            return
        try:
            self.inner_handle_write()
        finally:
            self._writelock_release()

    def readable(self):
        return not self.will_close

    def handle_read(self):
        self.inner_handle_read()

    def set_sync(self):
        pass

    #
    # SYNCHRONOUS METHODS
    #

    def sync_write(self, data):
        self._writelock_acquire()
        try:
            DualModeChannel.sync_write(self, data)
        finally:
            self._writelock_release()

    def sync_flush(self):
        self._writelock_acquire()
        try:
            DualModeChannel.sync_flush(self)
        finally:
            self._writelock_release()

    def set_async(self):
        pass

    #
    # METHODS USED IN BOTH MODES
    #

    def close_when_done(self):
        self.will_close = 1
        self.pull_trigger()




=== Added File Zope3/lib/python/Zope/Server/Utilities.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.


def find_double_newline(s):
    """Returns the position just after a double newline in the given string."""
    pos1 = s.find('\n\r\n')  # One kind of double newline
    if pos1 >= 0:
        pos1 += 3
    pos2 = s.find('\n\n')    # Another kind of double newline
    if pos2 >= 0:
        pos2 += 2

    if pos1 >= 0:
        if pos2 >= 0:
            return min(pos1, pos2)
        else:
            return pos1
    else:
        return pos2




=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.14 => 1.1.2.15 ===
 """
 
-SIMULT_MODE = 0
+SIMULT_MODE = 0  # Turn on to enable experimental simultaneous channel mode.
 
 import asyncore
 import re
@@ -25,12 +25,14 @@
 from medusa import logger
 
 if SIMULT_MODE:
-    from dual_mode_channel import simultaneous_mode_channel as \
+    from DualModeChannel import SimultaneousModeChannel as \
          channel_base_class
 else:
-    from dual_mode_channel import dual_mode_channel as channel_base_class
+    from DualModeChannel import DualModeChannel as channel_base_class
 
-from dual_mode_channel import AlternateSocketMapMixin, OverflowableBuffer
+from DualModeChannel import AlternateSocketMapMixin
+from Buffers import OverflowableBuffer
+from Utilities import find_double_newline
 from Adjustments import default_adj
 from IHeaderOutput import IHeaderOutput
 from ITask import ITask
@@ -238,7 +240,7 @@
 
 
 
-class http_request_data:
+class HTTPRequestParser:
     """
     A structure that collects the HTTP request.
     """
@@ -259,6 +261,7 @@
         """
         adj is an Adjustments object.
         """
+        self.headers = {}
         self.adj = adj
 
     def received(self, data):
@@ -275,11 +278,11 @@
         if br is None:
             # In header.
             s = self.header_plus + data
-            index = s.find('\r\n\r\n')
+            index = find_double_newline(s)
             if index >= 0:
                 # Header finished.
                 header_plus = s[:index]
-                consumed = len(data) - (len(s) - (index + 4))
+                consumed = len(data) - (len(s) - index)
                 self.in_header = 0
                 # Remove preceeding blank lines.
                 header_plus = header_plus.lstrip()
@@ -308,18 +311,18 @@
         Parses the header_plus block of text (the headers plus the
         first line of the request).
         """
-        index = header_plus.find('\r\n')
+        index = header_plus.find('\n')
         if index >= 0:
-            first_line = header_plus[:index]
-            header = header_plus[index + 2:]
+            first_line = header_plus[:index].rstrip()
+            header = header_plus[index + 1:]
         else:
-            first_line = header_plus
+            first_line = header_plus.rstrip()
             header = ''
         self.first_line = first_line
         self.header = header
 
         lines = self.get_header_lines()
-        self.headers = headers = {}
+        headers = self.headers
         for line in lines:
             index = line.find(':')
             if index > 0:
@@ -340,7 +343,7 @@
         if version == '1.1':
             te = headers.get('TRANSFER_ENCODING', '')
             if te == 'chunked':
-                from chunking import ChunkedReceiver
+                from Chunking import ChunkedReceiver
                 self.chunked = 1
                 buf = OverflowableBuffer(self.adj.inbuf_overflow)
                 self.body_rcv = ChunkedReceiver(buf)
@@ -357,7 +360,7 @@
         Splits the header into lines, putting multi-line headers together.
         """
         r = []
-        lines = self.header.split('\r\n')
+        lines = self.header.split('\n')
         for line in lines:
             if line and line[0] in ' \t':	
                 r[-1] = r[-1] + line[1:]
@@ -418,7 +421,7 @@
     active_channels = {}        # Class-specific channel tracker
     next_channel_cleanup = [0]  # Class-specific cleanup time
 
-    proto_request = None      # An http_request_data instance
+    proto_request = None      # An HTTPRequestParser instance
     ready_requests = None     # A list
     last_activity = 0         # Time of last activity
     running_tasks = 0         # boolean
@@ -478,7 +481,7 @@
         preq = self.proto_request
         while data:
             if preq is None:
-                preq = http_request_data(self.adj)
+                preq = HTTPRequestParser(self.adj)
             n = preq.received(data)
             if preq.completed:
                 # The request is ready to use.

=== Removed File Zope3/lib/python/Zope/Server/chunking.py ===

=== Removed File Zope3/lib/python/Zope/Server/dual_mode_channel.py ===