[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - HTTPServer2.py:1.1.2.1 PublisherServers.py:1.1.2.1 TaskThreads.py:1.1.2.1 dual_mode_channel.py:1.1.2.1 HTTPResponse.py:1.1.2.7 HTTPServer.py:1.1.2.7

Shane Hathaway shane@digicool.com
Wed, 21 Nov 2001 19:22:41 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv31156/Server

Modified Files:
      Tag: Zope-3x-branch
	HTTPResponse.py HTTPServer.py 
Added Files:
      Tag: Zope-3x-branch
	HTTPServer2.py PublisherServers.py TaskThreads.py 
	dual_mode_channel.py 
Log Message:
- Created new HTTP server based on Medusa and ZServer.

- Made some corresponding changes to Zope.Publication.

- Got minitest working again.


=== Added File Zope3/lib/python/Zope/Server/HTTPServer2.py ===

# This server uses asyncore to accept connections and do initial
# processing but threads to do work.

SIMULT_MODE = 0

import asyncore
import re
import socket
import sys
import time
from urllib import unquote

from medusa.http_date import build_http_date

if SIMULT_MODE:
    from dual_mode_channel import simultaneous_mode_channel as channel_type
else:
    from dual_mode_channel import dual_mode_channel as channel_type

from dual_mode_channel import synchronous_instream

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO


if 1:
    # Patch asyncore for speed.
    if hasattr(asyncore.dispatcher, '__getattr__'):
        del asyncore.dispatcher.__getattr__


class http_task:

    # __implements__ = ITask

    instream = None
    close_on_finish = 1
    status_str = '200 Ok'
    wrote_header = 0
    accumulated_headers = None

    def __init__(self, channel, request_header_plus, body_start):
        self.channel = channel
        self.request_header_plus = request_header_plus
        self.body_start = body_start
        self.response_headers = {
            'Server' : 'Zope.Server.HTTPServer',
            'Date'   : build_http_date (time.time())
            }

    def defer(self):
        """
        Called when the task will be serviced in a different thread.
        """
        pass

    def service(self):
        """
        """
        try:
            try:
                self.init()
                self.execute()
                self.finish()
            finally:
                self.channel.end_task(self.close_on_finish)
        except socket.error:
            self.channel.handle_comm_error()

    def cancel(self):
        """
        Called when shutting down the server.
        """
        self.channel.kill_task()

    def init(self):
        """
        """
        rhp = self.request_header_plus
        index = rhp.find('\r\n')
        if index >= 0:
            first_line = rhp[:index]
            header = rhp[index + 2:]
        else:
            first_line = rhp
            header = ''
        self.first_line = first_line
        self.header = header

        lines = get_header_lines(header)
        self.request_headers = request_headers = {}
        for line in lines:
            index = line.find(':')
            if index > 0:
                key = line[:index]
                value = line[index + 1:].strip()
                key1 = key.upper().replace('-', '_')
                request_headers[key1] = value
            # else there's garbage in the headers?

        channel = self.channel
        request_body_len = int(request_headers.get('CONTENT_LENGTH', 0))
        self.request_body_len = request_body_len
        if request_body_len > 0:
            body_start = self.body_start
            body_end = body_start + request_body_len
            self.instream = synchronous_instream(
                channel, body_start, body_end)
        else:
            self.instream = StringIO('')

        command, uri, version = crack_first_line(self.first_line)
        self.command = str(command)
        if uri and '%' in uri:
            uri = unquote(uri)
        self.uri = str(uri)
        if version not in ('1.0', '1.1'):
            # fall back to a version we support.
            version = '1.0'
        self.version = version

    # setResponseStatus(), setResponseHeaders(), appendResponseHeaders(),
    # and wroteResponseHeader() are part of the IHeaderOutput interface
    # used by Zope.Publisher.HTTP.HTTPResponse.

    def setResponseStatus(self, s):
        self.status_str = s

    def setResponseHeaders(self, mapping):
        self.response_headers.update(mapping)

    def appendResponseHeaders(self, lst):
        """
        Takes a list of strings.
        """
        accum = self.accumulated_headers
        if accum is None:
            self.accumulated_headers = accum = []
        accum.extend(lst)

    def wroteResponseHeader(self):
        return self.wrote_header

    def prepareResponseHeaders(self):
        version = self.version
        # Figure out whether the connection should be closed.
        connection = self.request_headers.get('CONNECTION', None)
        close_on_finish = 1
        
        response_headers = self.response_headers
        if version == '1.0':
            if connection == 'keep-alive':
                if response_headers.has_key('Content-Length'):
                    close_on_finish = 0
                    self.response_headers['Connection'] = 'Keep-Alive'
        elif self.version == '1.1':
            if connection != 'close':
                if response_headers.has_key('Content-Length'):
                    close_on_finish = 0
                else:
                    te = response_headers.get('Transfer-Encoding', None)
                    if te is not None:
                        if te == 'chunked':
                            close_on_finish = 0
        if close_on_finish:
            self.response_headers['Connection'] = 'close'
        else:
            self.close_on_finish = 0

    def buildResponseHeader(self):
        self.prepareResponseHeaders()
        first_line = 'HTTP/%s %s' % (self.version, self.status_str)
        lines = [first_line] + map(
            lambda hv: '%s: %s' % hv, self.response_headers.items())
        accum = self.accumulated_headers
        if accum is not None:
            lines.extend(accum)
        res = '%s\r\n\r\n' % '\r\n'.join(lines)
        return res

    def execute(self):
        """
        Override this.
        """
        body = ("The uri was %s\r\n" % self.uri) * 10
        self.response_headers['Content-Type'] = 'text/plain'
        self.response_headers['Content-Length'] = str(len(body))
        self.write(body)

    def finish(self):
        if not self.wrote_header:
            if not self.response_headers.has_key('Content-Length'):
                self.response_headers['Content-Length'] = '0'
            self.write('')

    def write(self, data):
        channel = self.channel
        if not self.wrote_header:
            rh = self.buildResponseHeader()
            channel.sync_write(rh)
            self.wrote_header = 1
        return channel.sync_write(data)

    def flush(self):
        self.channel.sync_flush()




class http_channel (channel_type):

    max_header_size = 65536  # Ought to be enough

    task_class = http_task

    def after_read_no_task(self):
        self.inbuf.seek(0)
        data = self.inbuf.read()
        if len(data) >= self.max_header_size:
            # Header too big.  DoS?
            self.close_when_done()
            return
        # reading headers for the next task.
        index = data.find('\r\n\r\n')
        if index >= 0:
            header_plus = data[:index]
            body_start = index + 4
            self.inbuf.seek(body_start)

            # Remove preceeding blank lines.
            header_plus = header_plus.lstrip()
            if not header_plus:
                # No request was made.
                self.close_when_done()
                return

            task = self.task_class(self, header_plus, body_start)
            self.start_task(task)
            self.server.addTask(task)



class http_server (asyncore.dispatcher):

    channel_class = http_channel

    def __init__(self, ip, port, backlog=5, tasks=None):
        # Assumes sock is already bound.
        asyncore.dispatcher.__init__(self)
        self.tasks = tasks
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind((ip, port))
        self.listen(backlog)

    def writable (self):
        return 0
        
    def handle_read (self):
        pass
        
    def readable (self):
        return self.accepting
        
    def handle_connect (self):
        pass

    def handle_error(self):
        t = sys.exc_info()[0]
        if t is KeyboardInterrupt:
            raise t
        asyncore.dispatcher.handle_error(self)

    def handle_accept (self):
        try:
            conn, addr = self.accept()
        except socket.error:
                # linux: on rare occasions we get a bogus socket back from
                # accept.  socketmodule.c:makesockaddr complains that the
                # address family is unknown.  We don't want the whole server
                # to shut down because of this.
            self.log_info ('warning: server accept() threw an exception',
                           'warning')
            return
        except TypeError:
                # unpack non-sequence.  this can happen when a read event
                # fires on a listening socket, but when we call accept()
                # we get EWOULDBLOCK, so dispatcher.accept() returns None.
                # Seen on FreeBSD3.
            self.log_info ('warning: server accept() threw EWOULDBLOCK',
                           'warning')
            return
        self.channel_class(self, conn, addr)

    def addTask(self, task):
        tasks = self.tasks
        if tasks is not None:
            self.tasks.addTask(task)
        else:
            task.service()


first_line_re = re.compile (
    '([^ ]+) (?:[^ :?#]+://[^ ?#/]*)?([^ ]+)(( HTTP/([0-9.]+))$|$)')

def crack_first_line (r):
    m = first_line_re.match (r)
    if m is not None and m.end() == len(r):
        if m.group(3):
            version = m.group(5)
        else:
            version = None
        return m.group(1).upper(), m.group(2), version
    else:
        return None, None, None


def get_header_lines(header):
    """
    Splits the header into lines, putting multi-line headers together.
    """
    r = []
    lines = header.split('\r\n')
    for line in lines:
        if line and line[0] in ' \t':	
            r[-1] = r[-1] + line[1:]
        else:
            r.append(line)
    return r


if __name__ == '__main__':
    from TaskThreads import ThreadedTaskDispatcher
    tasks = ThreadedTaskDispatcher()
    tasks.setThreadCount(4)
    http_server('', 8080, tasks=tasks)
    try:
        asyncore.loop()
    except KeyboardInterrupt:
        print 'shutting down...'
        tasks.shutdown()


=== Added File Zope3/lib/python/Zope/Server/PublisherServers.py ===


from HTTPServer2 import http_task, http_channel, http_server

from Zope.Publisher.Publish import publish
from Zope.Publisher.HTTP.HTTPRequest import HTTPRequest
from Zope.Publisher.HTTP.HTTPResponse import HTTPResponse


class PublisherHTTPTask (http_task):

    def execute(self):
        server = self.channel.server
        resp = HTTPResponse(server.response_payload, self, self)
        req = HTTPRequest(server.request_payload, self.instream,
                          self.request_headers, resp)
        publish(req)



class PublisherHTTPChannel (http_channel):

    task_class = PublisherHTTPTask



class PublisherHTTPServer (http_server):

    channel_class = PublisherHTTPChannel
    
    def __init__(self, request_payload, response_payload,
                 ip, port, backlog=5, tasks=None):
        self.request_payload = request_payload
        self.response_payload = response_payload
        http_server.__init__(self, ip, port, backlog, tasks)

        
if __name__ == '__main__':
    from Zope.Publisher.HTTP.BrowserPayload import BrowserRequestPayload, \
         BrowserResponsePayload
    from Zope.Publisher.DefaultPublication import DefaultPublication

    class c:
        " "
        def __call__(self, URL):
            return 'You invoked URL %s just now.\n' % URL

    ob = c()
    ob.x = c()
    ob.x.y = c()

    pub = DefaultPublication(ob)
    request_payload = BrowserRequestPayload(pub)
    response_payload = BrowserResponsePayload()

    from TaskThreads import ThreadedTaskDispatcher
    tasks = ThreadedTaskDispatcher()
    tasks.setThreadCount(4)
    PublisherHTTPServer(request_payload, response_payload,
                        '', 8080, tasks=tasks)
    try:
        import asyncore
        asyncore.loop()
    except KeyboardInterrupt:
        print 'shutting down...'
        tasks.shutdown()


=== Added File Zope3/lib/python/Zope/Server/TaskThreads.py ===


from Queue import Queue, Empty
from thread import allocate_lock, start_new_thread


class ITask:  # Interface

    def service():
        """
        Services the task.  Either service() or cancel() is called
        for every task queued.
        """

    def cancel():
        """
        Called instead of service() during shutdown or if an
        exception occurs that prevents the task from being
        serviced.  Must return quickly and should not throw exceptions.
        """

    def defer():
        """
        Called just before the task is queued to be executed in
        a different thread.
        """


class ThreadedTaskDispatcher:

    def __init__(self):
        self.threads = {}  # { thread number -> 1 }
        self.queue = Queue()
        self.thread_mgmt_lock = allocate_lock()

    def handlerThread(self, thread_no):
        threads = self.threads
        while threads.has_key(thread_no):
            task = self.queue.get()
            try:
                task.service()
            except:
                # Log somewhere?
                import traceback
                traceback.print_exc()

    def setThreadCount(self, count):
        mlock = self.thread_mgmt_lock
        mlock.acquire()
        try:
            threads = self.threads
            thread_no = 0
            while (len(threads) < count):
                while threads.has_key(thread_no):
                    thread_no = thread_no + 1
                threads[thread_no] = 1
                start_new_thread(self.handlerThread, (thread_no,))
                thread_no = thread_no + 1
            while (len(threads) > count):
                if count == 0:
                    threads.clear()
                else:
                    thread_no = threads.keys()[0]
                    del threads[thread_no]
        finally:
            mlock.release()

    def addTask(self, task):
        try:
            task.defer()
            self.queue.put_nowait(task)
        except:
            task.cancel()
            raise

    def shutdown(self, cancel_pending=1):
        self.setThreadCount(0)
        if cancel_pending:
            try:
                while 1:
                    task = self.queue.get_nowait()
                    task.cancel()
            except Empty:
                pass

    def hasTasks(self):
        # Inherently non-thread-safe.
        return not self.queue.empty()




=== Added File Zope3/lib/python/Zope/Server/dual_mode_channel.py === (424/524 lines abridged)


import asyncore
import socket
import sys
import time

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

from medusa.thread.select_trigger import trigger

pull_trigger = trigger().pull_trigger




class dual_mode_channel (asyncore.dispatcher):
    """
    The channel switches between asynchronous and synchronous mode.
    """

    # recv_bytes is the argument to pass to socket.recv().
    recv_bytes = 8192

    # outbuf_maxsize specifies maximum outbuf is allowed to hold
    # before the application starts blocking on output.
    # Raising outbuf_maxsize will improve throughput if you have
    # files larger than outbuf_maxsize being transferred over
    # more concurrent, slow connections than your worker thread count.
    # Expect maximum RAM consumption by outbufs to be
    # at most (number_of_concurrent_connections * outbuf_maxsize),
    # but if you're using ZODB and everyone is downloading the
    # same file then the normal RAM consumption is only a little more
    # than (number of ZODB threads * outbuf_maxsize) because of
    # ConservingStringBuffer.  Also, if ZODB is changed to
    # share strings among threads, normal RAM consumption by outbufs
    # will decrease significantly.
    outbuf_maxsize = 4200000  # About 4 MB

    # Create a tempfile if the input data gets larger than inbuf_overflow.
    inbuf_overflow = 525000   # About 0.5 MB
    overflowed = 0

    # will_close is set to 1 to close the socket.
    will_close = 0

    async_mode = 1

[-=- -=- -=- 424 lines omitted -=- -=- -=-]

        Adds bytes to the end of the buffer.
        """
        self.data.append(s)
        self.len = self.len + len(s)

    def get_chunk(self, minbytes=4096, delete=0):
        """
        Returns a string from the start of the buffer, preferring
        at least (minsize) bytes, optionally deleting that part.
        """
        data = self.data
        if not data:
            return ''
        gotbytes = 0
        for index in range(len(data)):
            gotbytes = gotbytes + len(data[index])
            if gotbytes >= minbytes:
                break
        res = ''.join(data[:index + 1])
        if delete:
            del data[:index + 1]
            self.len = self.len - gotbytes
        return res

    def del_bytes(self, bytes):
        """
        Deletes the given number of bytes from the start of the buffer.
        """
        gotbytes = 0
        data = self.data
        for index in range(len(data)):
            s = data[index]
            slen = len(s)
            gotbytes = gotbytes + slen
            if gotbytes > bytes:
                position = slen - (gotbytes - bytes)
                del data[:index]
                data[0] = s[position:]
                self.len = self.len - bytes
                return
            elif gotbytes == bytes:
                del data[:index + 1]
                self.len = self.len - bytes
                return
        # Hmm, too many!
        raise ValueError, (
            "Can't delete %d bytes from buffer of %d bytes" %
            (bytes, gotbytes))




=== Zope3/lib/python/Zope/Server/HTTPResponse.py 1.1.2.6 => 1.1.2.7 ===
         outstream=self.outstream
         
-        if not self._wrote:
+        if not self._wrote_headers:
             l=self.headers.get('content-length', None)
             if l is not None:
                 try:
@@ -158,7 +158,7 @@
 
             self._streaming=1
             outstream.write(str(self))
-            self._wrote=1
+            self._wrote_headers=1
 
         if not data: return
 


=== Zope3/lib/python/Zope/Server/HTTPServer.py 1.1.2.6 => 1.1.2.7 ===
         env['SERVER_NAME']=server.server_name
         env['SERVER_SOFTWARE']=server.SERVER_IDENT
-        env['SERVER_PROTOCOL']="HTTP/"+request.version
+        env['SERVER_PROTOCOL']="HTTP/%s" % request.version
         env['channel.creation_time']=request.channel.creation_time
         if self.uri_base=='/':
             env['SCRIPT_NAME']=''