[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - IStreamConsumer.py:1.1.2.1 ITaskDispatcher.py:1.1.2.1 ServerBase.py:1.1.2.1 Adjustments.py:1.1.2.3 Buffers.py:1.1.2.2 Chunking.py:1.1.2.2 DualModeChannel.py:1.1.2.2 HTTPServer.py:1.1.2.16 IHeaderOutput.py:1.1.2.2 ITask.py:1.1.2.2 PublisherServers.py:1.1.2.8 TaskThreads.py:1.1.2.7 Utilities.py:1.1.2.2 ZLogIntegration.py:1.1.2.2 __init__.py:1.1.2.4

Shane Hathaway shane@cvs.zope.org
Fri, 8 Feb 2002 10:06:04 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv11304

Modified Files:
      Tag: Zope-3x-branch
	Adjustments.py Buffers.py Chunking.py DualModeChannel.py 
	HTTPServer.py IHeaderOutput.py ITask.py PublisherServers.py 
	TaskThreads.py Utilities.py ZLogIntegration.py __init__.py 
Added Files:
      Tag: Zope-3x-branch
	IStreamConsumer.py ITaskDispatcher.py ServerBase.py 
Log Message:
- Factored the non-HTTP-specific code into ServerBase, in preparation for
  an FTP server.

- Added explicit interfaces and docstrings.

- Updated licenses.


=== Added File Zope3/lib/python/Zope/Server/IStreamConsumer.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.


from Interface import Interface, Attribute

class IStreamConsumer (Interface):
    """Consumes a data stream until reaching a completion point.

    The actual amount to be consumed might not be known ahead of time.
    """    

    def received(data):
        """Accepts data, returning the number of bytes consumed."""

    completed = Attribute(
        'completed', 'Set to a true value when finished consuming data.')


=== Added File Zope3/lib/python/Zope/Server/ITaskDispatcher.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.


from Interface import Interface

class ITaskDispatcher (Interface):
    """An object that accepts tasks and dispatches them to threads.
    """

    def setThreadCount(count):
        """Sets the number of handler threads.
        """

    def addTask(task):
        """Receives a task and dispatches it to a thread.

        Note that, depending on load, a task may have to wait a
        while for its turn.
        """

    def shutdown(cancel_pending=1, timeout=5):
        """Shuts down all handler threads and may cancel pending tasks.
        """

    def getPendingTasksEstimate():
        """Returns an estimate of the number of tasks waiting to be serviced.

        This method may be useful for monitoring purposes.  If the
        number of pending tasks is continually climbing, your server
        is becoming overloaded and the operator should be notified.
        """



=== Added File Zope3/lib/python/Zope/Server/ServerBase.py ===
# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
# FOR A PARTICULAR PURPOSE.


import os
import asyncore
import socket
import time
from thread import allocate_lock

from DualModeChannel import AlternateSocketMapMixin
from IStreamConsumer import IStreamConsumer
from Adjustments import default_adj


# Enable ZOPE_SERVER_SIMULT_MODE to enable experimental
# simultaneous channel mode, which may improve or degrade
# throughput depending on load characteristics.
if os.environ.get('ZOPE_SERVER_SIMULT_MODE'):
    from DualModeChannel import SimultaneousModeChannel as \
         channel_base_class
else:
    from DualModeChannel import DualModeChannel as channel_base_class



class FixedStreamReceiver:

    __implements__ = IStreamConsumer

    completed = 0

    def __init__(self, cl, buf):
        self.remain = cl
        self.buf = buf

    def received(self, data):
        rm = self.remain
        if rm < 1:
            self.completed = 1  # Avoid any chance of spinning
            return 0
        datalen = len(data)
        if rm <= datalen:
            self.buf.append(data[:rm])
            self.remain = 0
            self.completed = 1
            return rm
        else:
            self.buf.append(data)
            self.remain -= datalen
            return datalen

    def getfile(self):
        return self.buf.getfile()



# Synchronize access to the "running_tasks" attributes.
running_lock = allocate_lock()



class ServerChannelBase (channel_base_class):
    """Base class for a high-performance, mixed-mode server-side channel.
    """

    parser_class = None       # Subclasses must provide a parser class
    task_class = None         # ... and a task class.

    active_channels = {}        # Class-specific channel tracker
    next_channel_cleanup = [0]  # Class-specific cleanup time

    proto_request = None      # A request parser instance
    ready_requests = None     # A list
    last_activity = 0         # Time of last activity
    running_tasks = 0         # boolean

    #
    # ASYNCHRONOUS METHODS (incl. __init__)
    #

    def __init__(self, server, conn, addr, adj=None, socket_map=None):
        channel_base_class.__init__(self, server, conn, addr, adj, socket_map)
        self.last_activity = t = self.creation_time
        self.check_maintenance(t)

    def add_channel(self, map=None):
        """This hook keeps track of opened HTTP channels.
        """
        channel_base_class.add_channel(self, map)
        self.active_channels[self._fileno] = self

    def del_channel(self, map=None):
        """This hook keeps track of closed HTTP channels.
        """
        channel_base_class.del_channel(self, map)
        ac = self.active_channels
        fd = self._fileno
        if ac.has_key(fd):
            del ac[fd]

    def check_maintenance(self, now):
        """Performs maintenance if necessary.
        """
        if now < self.next_channel_cleanup[0]:
            return
        self.next_channel_cleanup[0] = now + self.adj.cleanup_interval
        self.maintenance()

    def maintenance(self):
        """Kills off dead connections.
        """
        self.kill_zombies()

    def kill_zombies(self):
        """Closes connections that have not had any activity in a while.

        The timeout is configured through adj.channel_timeout (seconds).
        """
        now = time.time()
        cutoff = now - self.adj.channel_timeout
        for channel in self.active_channels.values():
            if (channel is not self and not channel.running_tasks and
                channel.last_activity < cutoff):
                channel.close()

    def received(self, data):
        """Receives input asynchronously and launches or queues requests.
        """
        preq = self.proto_request
        while data:
            if preq is None:
                preq = self.parser_class(self.adj)
            n = preq.received(data)
            if preq.completed:
                # The request is ready to use.
                if not preq.empty:
                    self.queue_request(preq)
                preq = None
                self.proto_request = None
            else:
                self.proto_request = preq
            if n >= len(data):
                break
            data = data[n:]

    def queue_request(self, req):
        """Queues a request to be processed in sequence by a task.
        """
        do_now = 0
        running_lock.acquire()
        try:
            if self.running_tasks:
                # Wait for the current tasks to finish.
                rr = self.ready_requests
                if rr is None:
                    rr = []
                    self.ready_requests = rr
                rr.append(req)
            else:
                # Do it now.
                self.running_tasks = 1
                do_now = 1
        finally:
            running_lock.release()
        if do_now:
            self.set_sync()
            self.create_task(req)

    def handle_error(self):
        """Handles program errors (not communication errors)
        """
        t, v = sys.exc_info()[:2]
        if t is SystemExit or t is KeyboardInterrupt:
            raise t, v
        asyncore.dispatcher.handle_error(self)

    def handle_comm_error(self):
        """Handles communication errors (not program errors)
        """
        if self.adj.log_socket_errors:
            self.handle_error()
        else:
            # Ignore socket errors.
            self.close()

    #
    # SYNCHRONOUS METHODS
    #

    def end_task(self, close):
        """Called at the end of a task, may launch another task.
        """
        if close:
            self.close_when_done()
            return
        new_req = None
        running_lock.acquire()
        try:
            rr = self.ready_requests
            if rr:
                new_req = rr.pop(0)
            else:
                self.running_tasks = 0
        finally:
            running_lock.release()
        if new_req:
            # Respond to the next request.
            self.create_task(new_req)
        else:
            # Wait for another request on this connection.
            self.set_async()

    #
    # BOTH MODES
    #

    def create_task(self, req):
        """Creates a new task and queues it for execution.

        The task may get executed in another thread.
        """
        task = self.task_class(self, req)
        self.server.addTask(task)





class ServerBase (AlternateSocketMapMixin, asyncore.dispatcher):
    """Async. server base for launching derivatives of ServerChannelBase.
    """

    channel_class = None    # Override with a channel class.
    SERVER_IDENT = 'Zope.Server.ServerBase'  # Override.

    def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
                 hit_log=None, verbose=0, socket_map=None):
        if adj is None:
            adj = default_adj
        self.adj = adj
        self.socket_map = socket_map
        asyncore.dispatcher.__init__(self)
        self.port = port
        self.task_dispatcher = task_dispatcher
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((ip, port))
        self.verbose = verbose
        self.hit_log = hit_log
        self.server_name = self.computeServerName(ip)

        if start:
            self.accept_connections()

    def computeServerName(self, ip=''):
        if ip:
            server_name = str(ip)
        else:
            server_name = str(socket.gethostname())
        # Convert to a host name if necessary.
        is_hostname = 0
        for c in server_name:
            if c != '.' and not c.isdigit():
                is_hostname = 1
                break
        if not is_hostname:
            if self.verbose:
                self.log_info('Computing hostname', 'info')
            try:
                server_name = socket.gethostbyaddr(server_name)[0]
            except socket.error:
                if self.verbose:
                    self.log_info('Cannot do reverse lookup', 'info')
        return server_name

    def accept_connections(self):
        self.accepting = 1
        self.socket.listen(self.adj.backlog)  # Circumvent asyncore's NT limit
        if self.verbose:
            self.log_info('%s started.\n'
                          '\tHostname: %s\n\tPort: %d' % (
                self.SERVER_IDENT,
                self.server_name,
                self.port
                ))

    def readable(self):
        return (self.accepting and
                len(asyncore.socket_map) < self.adj.connection_limit)

    def writable (self):
        return 0
        
    def handle_read (self):
        pass
        
    def readable (self):
        return self.accepting
        
    def handle_connect (self):
        pass

    def handle_accept (self):
        try:
            v = self.accept()
            if v is None:
                return
            conn, addr = v
        except socket.error:
            # Linux: On rare occasions we get a bogus socket back from
            # accept.  socketmodule.c:makesockaddr complains that the
            # address family is unknown.  We don't want the whole server
            # to shut down because of this.
            if self.adj.log_socket_errors:
                self.log_info ('warning: server accept() threw an exception',
                               'warning')
            return
        self.channel_class(self, conn, addr, self.adj, self.socket_map)

    def addTask(self, task):
        td = self.task_dispatcher
        if td is not None:
            td.addTask(task)
        else:
            task.service()



=== Zope3/lib/python/Zope/Server/Adjustments.py 1.1.2.2 => 1.1.2.3 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS


=== Zope3/lib/python/Zope/Server/Buffers.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 


=== Zope3/lib/python/Zope/Server/Chunking.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -9,9 +9,12 @@
 
 
 from Utilities import find_double_newline
+from IStreamConsumer import IStreamConsumer
 
 
 class ChunkedReceiver:
+
+    __implements__ = IStreamConsumer
 
     chunk_remainder = 0
     control_line = ''


=== Zope3/lib/python/Zope/Server/DualModeChannel.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
@@ -65,8 +65,7 @@
 
 
 class DualModeChannel (AlternateSocketMapMixin, asyncore.dispatcher):
-    """
-    The channel switches between asynchronous and synchronous mode.
+    """Channel that switches between asynchronous and synchronous mode.
     """
 
     # will_close is set to 1 to close the socket.
@@ -148,6 +147,10 @@
         self.handle_error()
 
     def set_sync(self):
+        """Switches to synchronous mode.
+
+        The main thread will stop calling received().
+        """
         self.async_mode = 0
 
     #
@@ -182,6 +185,10 @@
                 self.socket.setblocking(0)
 
     def set_async(self):
+        """Switches to asynchronous mode.
+
+        The main thread will begin calling received() again.
+        """
         self.async_mode = 1
         self.pull_trigger()
 
@@ -221,7 +228,9 @@
 
 
 class SimultaneousModeChannel (DualModeChannel):
-    """
+    """Layer on top of DualModeChannel that allows communication in
+    both the main thread and other threads at the same time.
+
     The channel operates in synchronous mode with an asynchronous
     helper.  The asynchronous callbacks empty the output buffer
     and fill the input buffer.


=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.15 => 1.1.2.16 ===
 """
 
-SIMULT_MODE = 0  # Turn on to enable experimental simultaneous channel mode.
-
 import asyncore
 import re
 import socket
@@ -24,16 +22,10 @@
 from medusa.http_date import build_http_date, monthname
 from medusa import logger
 
-if SIMULT_MODE:
-    from DualModeChannel import SimultaneousModeChannel as \
-         channel_base_class
-else:
-    from DualModeChannel import DualModeChannel as channel_base_class
-
-from DualModeChannel import AlternateSocketMapMixin
+from ServerBase import ServerBase, ServerChannelBase, FixedStreamReceiver
 from Buffers import OverflowableBuffer
 from Utilities import find_double_newline
-from Adjustments import default_adj
+from IStreamConsumer import IStreamConsumer
 from IHeaderOutput import IHeaderOutput
 from ITask import ITask
 
@@ -43,19 +35,11 @@
 except ImportError:
     from StringIO import StringIO
 
-from thread import allocate_lock
-
 
-if 1:
-    # Patch asyncore for speed.
-    if hasattr(asyncore.dispatcher, '__getattr__'):
-        del asyncore.dispatcher.__getattr__
 
+class HTTPTask:
+    """An HTTP task accepts a request and writes to a channel.
 
-class http_task:
-    """
-    An HTTP task receives a parsed request and an HTTP channel
-    and is expected to write its response to that channel.
     Subclass this and override the execute() method.
     """
 
@@ -74,7 +58,7 @@
         self.channel = channel
         self.request_data = request_data
         self.response_headers = {
-            'Server': 'Zope.Server.HTTPServer',
+            'Server': channel.server.SERVER_IDENT,
             }
         version = request_data.version
         if version not in ('1.0', '1.1'):
@@ -83,14 +67,12 @@
         self.version = version
 
     def defer(self):
-        """
-        Called when the task will be serviced in a different thread.
+        """Called when the task will be serviced in a different thread.
         """
         pass
 
     def service(self):
-        """
-        Called to execute the task.
+        """Called to execute the task.
         """
         try:
             try:
@@ -105,31 +87,32 @@
             self.channel.end_task(self.close_on_finish)
 
     def cancel(self):
-        """
-        Called when shutting down the server.
+        """Called when shutting down the server.
         """
         self.channel.close_when_done()
 
     def setResponseStatus(self, status, reason):
+        """See the IHeaderOutput interface."""
         self.status = status
         self.reason = reason
 
     def setResponseHeaders(self, mapping):
+        """See the IHeaderOutput interface."""
         self.response_headers.update(mapping)
 
     def appendResponseHeaders(self, lst):
-        """
-        Takes a list of strings.
-        """
+        """See the IHeaderOutput interface."""
         accum = self.accumulated_headers
         if accum is None:
             self.accumulated_headers = accum = []
         accum.extend(lst)
 
     def wroteResponseHeader(self):
+        """See the IHeaderOutput interface."""
         return self.wrote_header
 
     def setAuthUserName(self, name):
+        """See the IHeaderOutput interface."""
         self.auth_user_name = name
 
     def prepareResponseHeaders(self):
@@ -153,6 +136,9 @@
             elif response_headers.has_key ('Transfer-Encoding'):
                 if not response_headers['Transfer-Encoding'] == 'chunked':
                     close_it = 1
+            elif self.status == '304':
+                # Replying with headers only.
+                pass
             elif not response_headers.has_key ('Content-Length'):
                 close_it = 1
         else:
@@ -211,39 +197,14 @@
 
 
 
-class StreamedReceiver:
-
-    completed = 0
-
-    def __init__(self, cl, buf):
-        self.remain = cl
-        self.buf = buf
-
-    def received(self, data):
-        rm = self.remain
-        if rm < 1:
-            self.completed = 1  # Avoid any chance of spinning
-            return 0
-        datalen = len(data)
-        if rm <= datalen:
-            self.buf.append(data[:rm])
-            self.remain = 0
-            self.completed = 1
-            return rm
-        else:
-            self.buf.append(data)
-            self.remain -= datalen
-            return datalen
-
-    def getfile(self):
-        return self.buf.getfile()
-
-
-
 class HTTPRequestParser:
+    """A structure that collects the HTTP request.
+
+    Once the stream is completed, the instance is passed to
+    a server task constructor.
     """
-    A structure that collects the HTTP request.
-    """
+
+    __implements__ = IStreamConsumer
 
     completed = 0  # Set once request is completed.
     empty = 0        # Set if no request was made.
@@ -352,7 +313,7 @@
             self.content_length = cl
             if cl > 0:
                 buf = OverflowableBuffer(self.adj.inbuf_overflow)
-                self.body_rcv = StreamedReceiver(cl, buf)
+                self.body_rcv = FixedStreamReceiver(cl, buf)
 
 
     def get_header_lines(self):
@@ -407,170 +368,8 @@
 
 
 
-# Synchronize access to the "running_tasks" attribute.
-running_lock = allocate_lock()
-
-
-
-class http_channel (channel_base_class):
-    # Note: this class is very reusable for other protocols like FTP
-    # and should probably be turned into an abstract base class
-    # when we approach FTP.
-
-    task_class = http_task
-    active_channels = {}        # Class-specific channel tracker
-    next_channel_cleanup = [0]  # Class-specific cleanup time
-
-    proto_request = None      # An HTTPRequestParser instance
-    ready_requests = None     # A list
-    last_activity = 0         # Time of last activity
-    running_tasks = 0         # boolean
-
-    #
-    # ASYNCHRONOUS METHODS (incl. __init__)
-    #
-
-    def __init__(self, server, conn, addr, adj=None, socket_map=None):
-        channel_base_class.__init__(self, server, conn, addr, adj, socket_map)
-        self.last_activity = t = self.creation_time
-        self.check_maintenance(t)
-
-    def add_channel(self, map=None):
-        """
-        This hook keeps track of opened HTTP channels.
-        """
-        channel_base_class.add_channel(self, map)
-        self.active_channels[self._fileno] = self
-
-    def del_channel(self, map=None):
-        """
-        This hook keeps track of closed HTTP channels.
-        """
-        channel_base_class.del_channel(self, map)
-        ac = self.active_channels
-        fd = self._fileno
-        if ac.has_key(fd):
-            del ac[fd]
-
-    def check_maintenance(self, now):
-        if now < self.next_channel_cleanup[0]:
-            return
-        self.next_channel_cleanup[0] = now + self.adj.cleanup_interval
-        self.maintenance()
-
-    def maintenance(self):
-        # Kill off dead connections.
-        self.kill_zombies()
-
-    def kill_zombies(self):
-        """
-        Closes connections that have not had any activity in
-        (channel_timeout) seconds.
-        """
-        now = time.time()
-        cutoff = now - self.adj.channel_timeout
-        for channel in self.active_channels.values():
-            if (channel is not self and not channel.running_tasks and
-                channel.last_activity < cutoff):
-                channel.close()
-
-    def received(self, data):
-        """
-        Receives input asynchronously and launches or queues requests.
-        """
-        preq = self.proto_request
-        while data:
-            if preq is None:
-                preq = HTTPRequestParser(self.adj)
-            n = preq.received(data)
-            if preq.completed:
-                # The request is ready to use.
-                if not preq.empty:
-                    self.queue_request(preq)
-                preq = None
-                self.proto_request = None
-            else:
-                self.proto_request = preq
-            if n >= len(data):
-                break
-            data = data[n:]
-
-    def queue_request(self, req):
-        """
-        Queues requests to be processed in sequence by tasks.
-        """
-        do_now = 0
-        running_lock.acquire()
-        try:
-            if self.running_tasks:
-                # Wait for the current tasks to finish.
-                rr = self.ready_requests
-                if rr is None:
-                    rr = []
-                    self.ready_requests = rr
-                rr.append(req)
-            else:
-                # Do it now.
-                self.running_tasks = 1
-                do_now = 1
-        finally:
-            running_lock.release()
-        if do_now:
-            self.set_sync()
-            self.create_task(req)
-
-    def handle_error(self):
-        # Program error
-        t, v = sys.exc_info()[:2]
-        if t is SystemExit or t is KeyboardInterrupt:
-            raise t, v
-        asyncore.dispatcher.handle_error(self)
-
-    def handle_comm_error(self):
-        if self.adj.log_socket_errors:
-            self.handle_error()
-        else:
-            # Ignore socket errors.
-            self.close()
-
-    #
-    # SYNCHRONOUS METHODS
-    #
-
-    def end_task(self, close):
-        if close:
-            self.close_when_done()
-            return
-        new_req = None
-        running_lock.acquire()
-        try:
-            rr = self.ready_requests
-            if rr:
-                new_req = rr.pop(0)
-            else:
-                self.running_tasks = 0
-        finally:
-            running_lock.release()
-        if new_req:
-            # Respond to the next request.
-            self.create_task(new_req)
-        else:
-            # Wait for another request on this connection.
-            self.set_async()
-
-    #
-    # BOTH MODES
-    #
-
-    def create_task(self, req):
-        task = self.task_class(self, req)
-        self.server.addTask(task)
-
-
-
 class CommonHitLogger:
-    """
-    Outputs hits in common HTTP log format.
+    """Outputs hits in common HTTP log format.
     """
 
     def __init__(self, logger_object=None, resolver=None):
@@ -644,104 +443,19 @@
             )
 
 
-class http_server (AlternateSocketMapMixin, asyncore.dispatcher):
-
-    channel_class = http_channel
 
-    SERVER_IDENT = 'Zope.Server.HTTPServer.http_server'
+class HTTPServerChannel (ServerChannelBase):
+    task_class = HTTPTask
+    parser_class = HTTPRequestParser
 
-    def __init__(self, ip, port, task_dispatcher=None, adj=None, start=1,
-                 hit_log=None, verbose=0, socket_map=None):
-        if adj is None:
-            adj = default_adj
-        self.adj = adj
-        self.socket_map = socket_map
-        asyncore.dispatcher.__init__(self)
-        self.port = port
-        self.task_dispatcher = task_dispatcher
-        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.set_reuse_addr()
-        self.bind((ip, port))
-        self.verbose = verbose
-        self.hit_log = hit_log
-        self.server_name = self.computeServerName(ip)
-
-        if start:
-            self.accept_connections()
-
-    def computeServerName(self, ip=''):
-        if ip:
-            server_name = str(ip)
-        else:
-            server_name = str(socket.gethostname())
-        # Convert to a host name if necessary.
-        is_hostname = 0
-        for c in server_name:
-            if c != '.' and not c.isdigit():
-                is_hostname = 1
-                break
-        if not is_hostname:
-            if self.verbose:
-                self.log_info('Computing hostname', 'info')
-            try:
-                server_name = socket.gethostbyaddr(server_name)[0]
-            except socket.error:
-                if self.verbose:
-                    self.log_info('Cannot do reverse lookup', 'info')
-        return server_name
-
-    def accept_connections(self):
-        self.accepting = 1
-        self.socket.listen(self.adj.backlog)  # Circumvent asyncore's NT limit
-        if self.verbose:
-            self.log_info('HTTP server started.\n'
-                          '\tHostname: %s\n\tPort: %d' % (
-                self.server_name,
-                self.port
-                ))
-
-    def readable(self):
-        return (self.accepting and
-                len(asyncore.socket_map) < self.adj.connection_limit)
-
-    def writable (self):
-        return 0
-        
-    def handle_read (self):
-        pass
-        
-    def readable (self):
-        return self.accepting
-        
-    def handle_connect (self):
-        pass
-
-    def handle_accept (self):
-        try:
-            v = self.accept()
-            if v is None:
-                return
-            conn, addr = v
-        except socket.error:
-            # Linux: On rare occasions we get a bogus socket back from
-            # accept.  socketmodule.c:makesockaddr complains that the
-            # address family is unknown.  We don't want the whole server
-            # to shut down because of this.
-            if self.adj.log_socket_errors:
-                self.log_info ('warning: server accept() threw an exception',
-                               'warning')
-            return
-        self.channel_class(self, conn, addr, self.adj, self.socket_map)
-
-    def addTask(self, task):
-        td = self.task_dispatcher
-        if td is not None:
-            td.addTask(task)
-        else:
-            task.service()
+    active_channels = {}        # Class-specific channel tracker
+    next_channel_cleanup = [0]  # Class-specific cleanup time
 
 
 
+class HTTPServer (ServerBase):
+    channel_class = HTTPServerChannel
+    SERVER_IDENT = 'Zope.Server.HTTPServer'
 
 
 
@@ -749,7 +463,7 @@
     from TaskThreads import ThreadedTaskDispatcher
     td = ThreadedTaskDispatcher()
     td.setThreadCount(4)
-    http_server('', 8080, task_dispatcher=td)
+    HTTPServer('', 8080, task_dispatcher=td)
     try:
         while 1:
             asyncore.poll(5)


=== Zope3/lib/python/Zope/Server/IHeaderOutput.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -12,7 +12,11 @@
 
 
 class IHeaderOutput (Interface):
-    """
+    """Interface for setting HTTP response headers.
+
+    This allows the HTTP server and the application to both set response
+    headers.
+
     Zope.Publisher.HTTP.HTTPResponse is optionally passed an
     object which implements this interface in order to intermingle
     its headers with the HTTP server's response headers,
@@ -20,29 +24,25 @@
     """
 
     def setResponseStatus(status, reason):
-        """
-        Sets the status code and the accompanying message.
+        """Sets the status code and the accompanying message.
         """
 
     def setResponseHeaders(mapping):
-        """
-        Sets headers.  The headers must be Correctly-Cased.
+        """Sets headers.  The headers must be Correctly-Cased.
         """
 
     def appendResponseHeaders(lst):
-        """
-        Sets headers that can potentially repeat.
+        """Sets headers that can potentially repeat.
+
         Takes a list of strings.
         """
 
     def wroteResponseHeader():
-        """
-        Returns a flag indicating whether the response
+        """Returns a flag indicating whether the response
+
         header has already been sent.
         """
 
     def setAuthUserName(name):
-        """
-        Sets the name of the authenticated user so it can
-        be logged.
+        """Sets the name of the authenticated user so the name can be logged.
         """


=== Zope3/lib/python/Zope/Server/ITask.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS


=== Zope3/lib/python/Zope/Server/PublisherServers.py 1.1.2.7 => 1.1.2.8 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 
@@ -10,7 +10,7 @@
 
 from os import path as ospath
 
-from HTTPServer import http_task, http_channel, http_server
+from HTTPServer import HTTPTask, HTTPServerChannel, HTTPServer
 
 from Zope.Publisher.Publish import publish
 from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
@@ -23,7 +23,7 @@
     }
 
 
-class PublisherHTTPTask (http_task):
+class PublisherHTTPTask (HTTPTask):
 
     def execute(self):
         server = self.channel.server
@@ -95,18 +95,18 @@
 
 
 
-class PublisherHTTPChannel (http_channel):
+class PublisherHTTPChannel (HTTPServerChannel):
 
     task_class = PublisherHTTPTask
 
 
 
-class PublisherHTTPServer (http_server):
+class PublisherHTTPServer (HTTPServer):
 
     channel_class = PublisherHTTPChannel
     
     def __init__(self, request_payload, response_payload, *args, **kw):
         self.request_payload = request_payload
         self.response_payload = response_payload
-        http_server.__init__(self, *args, **kw)
+        HTTPServer.__init__(self, *args, **kw)
 


=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1.2.6 => 1.1.2.7 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
@@ -13,6 +13,8 @@
 from thread import allocate_lock, start_new_thread
 from time import time, sleep
 
+from ITaskDispatcher import ITaskDispatcher
+
 try:
     from zLOG import LOG, ERROR
 except ImportError:
@@ -20,8 +22,11 @@
     ERROR = None
 
 
+
 class ThreadedTaskDispatcher:
 
+    __implements__ = ITaskDispatcher
+
     stop_count = 0  # Number of threads that will stop soon.
 
     def __init__(self):
@@ -116,8 +121,6 @@
             except Empty:
                 pass
 
-    def hasTasks(self):
-        # Inherently non-thread-safe.
-        return not self.queue.empty()
-
+    def getPendingTasksEstimate(self):
+        return self.queue.qsize()
 


=== Zope3/lib/python/Zope/Server/Utilities.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS 


=== Zope3/lib/python/Zope/Server/ZLogIntegration.py 1.1.2.1 => 1.1.2.2 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE.
-"""
-Pokes zLOG default logging into asyncore.
+"""Makes asyncore log to zLOG.
 """
 
 from zLOG import LOG, register_subsystem, BLATHER, INFO, WARNING, ERROR


=== Zope3/lib/python/Zope/Server/__init__.py 1.1.2.3 => 1.1.2.4 ===
+# Copyright 2001-2002 Zope Corporation and Contributors.  All Rights Reserved.
 # 
 # This software is subject to the provisions of the Zope Public License,
-# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS