[Zope3-checkins] CVS: Zope3/src/zodb/zeo/zrpc - __init__.py:1.2 client.py:1.2 connection.py:1.2 error.py:1.2 log.py:1.2 marshal.py:1.2 server.py:1.2 smac.py:1.2 trigger.py:1.2

Jim Fulton jim@zope.com
Wed, 25 Dec 2002 09:13:55 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/zeo/zrpc

Added Files:
	__init__.py client.py connection.py error.py log.py marshal.py 
	server.py smac.py trigger.py 
Log Message:
Grand renaming:

- Renamed most files (especially python modules) to lower case.

- Moved views and interfaces into separate hierarchies within each
  project, where each top-level directory under the zope package
  is a separate project.

- Moved everything to src from lib/python.

  lib/python will eventually go away. I need access to the cvs
  repository to make this happen, however.

There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.



=== Zope3/src/zodb/zeo/zrpc/__init__.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/__init__.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,24 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+# zrpc is a package with the following modules
+# client -- manages connection creation to remote server
+# connection -- object dispatcher
+# log -- logging helper
+# error -- exceptions raised by zrpc
+# marshal -- internal, handles basic protocol issues
+# server -- manages incoming connections from remote clients
+# smac -- sized message async connections
+# trigger -- medusa's trigger
+
+# zrpc is not an advertised subpackage of ZEO; its interfaces are internal


=== Zope3/src/zodb/zeo/zrpc/client.py 1.1 => 1.2 === (423/523 lines abridged)
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/client.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,520 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import errno
+import select
+import socket
+import sys
+import threading
+import time
+import types
+
+from zodb.interfaces import ReadOnlyError
+
+from zodb.zeo import threadedasync
+from zodb.zeo.zrpc import log
+from zodb.zeo.zrpc.trigger import trigger
+from zodb.zeo.zrpc.connection import ManagedConnection
+
+class ConnectionManager:
+    """Keeps a connection up over time"""
+
+    def __init__(self, addrs, client, tmin=1, tmax=180):
+        self.addrlist = self._parse_addrs(addrs)
+        self.client = client
+        self.tmin = tmin
+        self.tmax = tmax
+        self.cond = threading.Condition(threading.Lock())
+        self.connection = None # Protected by self.cond
+        self.closed = 0
+        # If thread is not None, then there is a helper thread
+        # attempting to connect.
+        self.thread = None # Protected by self.cond
+        self.trigger = None
+        self.thr_async = 0
+        threadedasync.register_loop_callback(self.set_async)
+
+    def __repr__(self):

[-=- -=- -=- 423 lines omitted -=- -=- -=-]

+            self.state = "tested"
+        except ReadOnlyError:
+            log.info("CW: ReadOnlyError in testConnection (%s)",
+                      repr(self.addr))
+            self.close()
+            return
+        except:
+            log.error("CW: error in testConnection (%s)", repr(self.addr),
+                      exc_info=True)
+            self.close()
+            return
+        if self.preferred:
+            self.notify_client()
+
+    def notify_client(self):
+        """Call the client's notifyConnected().
+
+        If this succeeds, call the manager's connect_done().
+
+        If the client is already connected, we assume it's a fallback
+        connection, and the new connection must be a preferred
+        connection.  The client will close the old connection.
+        """
+        try:
+            self.client.notifyConnected(self.conn)
+        except:
+            log.error("CW: error in notifyConnected (%s)", repr(self.addr),
+                      exc_info=True)
+            self.close()
+            return
+        self.state = "notified"
+        self.mgr.connect_done(self.conn, self.preferred)
+
+    def close(self):
+        """Close the socket and reset everything."""
+        self.state = "closed"
+        self.mgr = self.client = None
+        self.preferred = 0
+        if self.conn is not None:
+            # Closing the ZRPC connection will eventually close the
+            # socket, somewhere in asyncore.
+            # XXX Why do we care? --Guido
+            self.conn.close()
+            self.conn = None
+        if self.sock is not None:
+            self.sock.close()
+            self.sock = None
+
+    def fileno(self):
+        return self.sock.fileno()


=== Zope3/src/zodb/zeo/zrpc/connection.py 1.1 => 1.2 === (409/509 lines abridged)
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/connection.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,506 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import asyncore
+import errno
+import select
+import sys
+import threading
+import types
+import logging
+
+from zodb.zeo import threadedasync
+from zodb.zeo.zrpc import smac
+from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
+from zodb.zeo.zrpc import log
+from zodb.zeo.zrpc.marshal import Marshaller
+from zodb.zeo.zrpc.trigger import trigger
+
+REPLY = ".reply" # message name used for replies
+ASYNC = 1
+
+class Delay:
+    """Used to delay response to client for synchronous calls
+
+    When a synchronous call is made and the original handler returns
+    without handling the call, it returns a Delay object that prevents
+    the mainloop from sending a response.
+    """
+
+    def set_sender(self, msgid, send_reply, return_error):
+        self.msgid = msgid
+        self.send_reply = send_reply
+        self.return_error = return_error
+
+    def reply(self, obj):
+        self.send_reply(self.msgid, obj)
+

[-=- -=- -=- 409 lines omitted -=- -=- -=-]

+        self.mgr = mgr
+        self.__super_init(sock, addr, obj)
+        self.obj.notifyConnected(self)
+
+    def close(self):
+        self.obj.notifyDisconnected()
+        self.mgr.close_conn(self)
+        self.__super_close()
+
+class ManagedConnection(Connection):
+    """Client-side Connection subclass."""
+    __super_init = Connection.__init__
+    __super_close = Connection.close
+
+    def __init__(self, sock, addr, obj, mgr):
+        self.mgr = mgr
+        self.__super_init(sock, addr, obj)
+        self.check_mgr_async()
+
+    # Defer the ThreadedAsync work to the manager.
+
+    def close_trigger(self):
+        # the manager should actually close the trigger
+        del self.trigger
+
+    def set_async(self, map):
+        pass
+
+    def _prepare_async(self):
+        # Don't do the register_loop_callback that the superclass does
+        pass
+
+    def check_mgr_async(self):
+        if not self.thr_async and self.mgr.thr_async:
+            assert self.mgr.trigger is not None, \
+                   "manager (%s) has no trigger" % self.mgr
+            self.thr_async = 1
+            self.trigger = self.mgr.trigger
+            return 1
+        return 0
+
+    def is_async(self):
+        # XXX could the check_mgr_async() be avoided on each test?
+        if self.thr_async:
+            return 1
+        return self.check_mgr_async()
+
+    def close(self):
+        self.mgr.close_conn(self)
+        self.__super_close()


=== Zope3/src/zodb/zeo/zrpc/error.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/error.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,21 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+from zodb import interfaces
+from zodb.zeo.interfaces import Disconnected
+
+class ZRPCError(interfaces.StorageError):
+    pass
+
+class DisconnectedError(ZRPCError, Disconnected):
+    """The database storage is disconnected from the storage server."""


=== Zope3/src/zodb/zeo/zrpc/log.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/log.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,93 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import os
+import types
+import threading
+import logging
+
+LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
+
+_label = "zrpc:%s" % os.getpid()
+
+# The code duplication here is for speed (save a layer of function call).
+
+def critical(msg, *args, **kw):
+    label = _label
+    if LOG_THREAD_ID:
+        label = "%s:%s" % (label, threading.currentThread().getName())
+    logging.critical("%s: "+msg, label, *args, **kw)
+
+def error(msg, *args, **kw):
+    label = _label
+    if LOG_THREAD_ID:
+        label = "%s:%s" % (label, threading.currentThread().getName())
+    logging.error("%s: "+msg, label, *args, **kw)
+
+def warn(msg, *args, **kw):
+    label = _label
+    if LOG_THREAD_ID:
+        label = "%s:%s" % (label, threading.currentThread().getName())
+    logging.warn("%s: "+msg, label, *args, **kw)
+
+def info(msg, *args, **kw):
+    label = _label
+    if LOG_THREAD_ID:
+        label = "%s:%s" % (label, threading.currentThread().getName())
+    logging.info("%s: "+msg, label, *args, **kw)
+
+def debug(msg, *args, **kw):
+    label = _label
+    if LOG_THREAD_ID:
+        label = "%s:%s" % (label, threading.currentThread().getName())
+    logging.debug("%s: "+msg, label, *args, **kw)
+
+REPR_LIMIT = 40
+
+def short_repr(obj):
+    "Return an object repr limited to REPR_LIMIT bytes."
+
+    # Some of the objects being repr'd are large strings.  It's wastes
+    # a lot of memory to repr them and then truncate, so special case
+    # them in this function.
+    # Also handle short repr of a tuple containing a long string.
+
+    # This strategy works well for arguments to StorageServer methods.
+    # The oid is usually first and will get included in its entirety.
+    # The pickle is near the beginning, too, and you can often fit the
+    # module name in the pickle.
+
+    if isinstance(obj, types.StringType):
+        if len(obj) > REPR_LIMIT:
+            r = repr(obj[:REPR_LIMIT])
+        else:
+            r = repr(obj)
+        if len(r) > REPR_LIMIT:
+            r = r[:REPR_LIMIT-4] + '...' + r[-1]
+        return r
+    elif isinstance(obj, types.TupleType):
+        elts = []
+        size = 0
+        for elt in obj:
+            r = repr(elt)
+            elts.append(r)
+            size += len(r)
+            if size > REPR_LIMIT:
+                break
+        r = "(%s)" % (", ".join(elts))
+    else:
+        r = repr(obj)
+    if len(r) > REPR_LIMIT:
+        return r[:REPR_LIMIT] + '...'
+    else:
+        return r


=== Zope3/src/zodb/zeo/zrpc/marshal.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/marshal.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,65 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import cPickle
+from cStringIO import StringIO
+import types
+
+from zodb.zeo.zrpc.error import ZRPCError
+from zodb.zeo.zrpc import log
+
+class Marshaller:
+    """Marshal requests and replies to second across network"""
+
+    def encode(self, msgid, flags, name, args):
+        """Returns an encoded message"""
+        # (We used to have a global pickler, but that's not thread-safe. :-( )
+        pickler = cPickle.Pickler()
+        pickler.fast = 1
+        return pickler.dump((msgid, flags, name, args), 1)
+
+    def decode(self, msg):
+        """Decodes msg and returns its parts"""
+        unpickler = cPickle.Unpickler(StringIO(msg))
+        unpickler.find_global = find_global
+
+        try:
+            return unpickler.load() # msgid, flags, name, args
+        except:
+            log.error("can't decode message: %s", log.short_repr(msg))
+            raise
+
+_globals = globals()
+_silly = ('__doc__',)
+
+def find_global(module, name):
+    """Helper for message unpickler"""
+    try:
+        m = __import__(module, _globals, _globals, _silly)
+    except ImportError, msg:
+        raise ZRPCError("import error %s: %s" % (module, msg))
+
+    try:
+        r = getattr(m, name)
+    except AttributeError:
+        raise ZRPCError("module %s has no global %s" % (module, name))
+
+    safe = getattr(r, '__no_side_effects__', 0)
+    if safe:
+        return r
+
+    # XXX what's a better way to do this?  esp w/ 2.1 & 2.2
+    if type(r) == types.ClassType and issubclass(r, Exception):
+        return r
+
+    raise ZRPCError("Unsafe global: %s.%s" % (module, name))


=== Zope3/src/zodb/zeo/zrpc/server.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/zrpc/server.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,64 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import asyncore
+import socket
+import types
+
+from zodb.zeo import threadedasync
+from zodb.zeo.zrpc.connection import Connection, Delay
+from zodb.zeo.zrpc import log
+
+# Export the main asyncore loop
+loop = threadedasync.loop
+
+class Dispatcher(asyncore.dispatcher):
+    """A server that accepts incoming RPC connections"""
+    __super_init = asyncore.dispatcher.__init__
+
+    reuse_addr = 1
+
+    def __init__(self, addr, factory=Connection, reuse_addr=None):
+        self.__super_init()
+        self.addr = addr
+        self.factory = factory
+        self.clients = []
+        if reuse_addr is not None:
+            self.reuse_addr = reuse_addr
+        self._open_socket()
+
+    def _open_socket(self):
+        if type(self.addr) == types.TupleType:
+            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        else:
+            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        log.info("listening on %s", str(self.addr))
+        self.bind(self.addr)
+        self.listen(5)
+
+    def writable(self):
+        return 0
+
+    def readable(self):
+        return 1
+
+    def handle_accept(self):
+        try:
+            sock, addr = self.accept()
+        except socket.error, msg:
+            log.info("accepted failed: %s", msg)
+            return
+        c = self.factory(sock, addr)
+        log.info("connect from %s: %s", repr(addr), c)
+        self.clients.append(c)


=== Zope3/src/zodb/zeo/zrpc/smac.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/zrpc/smac.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,225 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Sized Message Async Connections."""
+
+import asyncore, struct
+import threading
+import socket, errno
+from types import StringType
+
+from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc import log
+
+# Use the dictionary to make sure we get the minimum number of errno
+# entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
+# or that only one is actually used.
+
+tmp_dict = {errno.EWOULDBLOCK: 0,
+            errno.EAGAIN: 0,
+            errno.EINTR: 0,
+            }
+expected_socket_read_errors = tuple(tmp_dict.keys())
+
+tmp_dict = {errno.EAGAIN: 0,
+            errno.EWOULDBLOCK: 0,
+            errno.ENOBUFS: 0,
+            errno.EINTR: 0,
+            }
+expected_socket_write_errors = tuple(tmp_dict.keys())
+del tmp_dict
+
+# We chose 60000 as the socket limit by looking at the largest strings
+# that we could pass to send() without blocking.
+SEND_SIZE = 60000
+
+class SizedMessageAsyncConnection(asyncore.dispatcher):
+    __super_init = asyncore.dispatcher.__init__
+    __super_close = asyncore.dispatcher.close
+
+    __closed = 1 # Marker indicating that we're closed
+
+    socket = None # to outwit Sam's getattr
+
+    def __init__(self, sock, addr, map=None, debug=None):
+        self.addr = addr
+        if debug is not None:
+            self._debug = debug
+        elif not hasattr(self, '_debug'):
+            self._debug = __debug__
+        # __input_lock protects __inp, __input_len, __state, __msg_size
+        self.__input_lock = threading.Lock()
+        self.__inp = None # None, a single String, or a list
+        self.__input_len = 0
+        # Instance variables __state and __msg_size work together:
+        #   when __state == 0:
+        #     __msg_size == 4, and the next thing read is a message size;
+        #   when __state == 1:
+        #     __msg_size is variable, and the next thing read is a message.
+        # The next thing read is always of length __msg_size.
+        # The state alternates between 0 and 1.
+        self.__state = 0
+        self.__msg_size = 4
+        self.__output_lock = threading.Lock() # Protects __output
+        self.__output = []
+        self.__closed = 0
+        self.__super_init(sock, map)
+
+    def get_addr(self):
+        return self.addr
+
+    # XXX avoid expensive getattr calls?  Can't remember exactly what
+    # this comment was supposed to mean, but it has something to do
+    # with the way asyncore uses getattr and uses if sock:
+    def __nonzero__(self):
+        return 1
+
+    def handle_read(self):
+        self.__input_lock.acquire()
+        try:
+            # Use a single __inp buffer and integer indexes to make this fast.
+            try:
+                d = self.recv(8192)
+            except socket.error, err:
+                if err[0] in expected_socket_read_errors:
+                    return
+                raise
+            if not d:
+                return
+
+            input_len = self.__input_len + len(d)
+            msg_size = self.__msg_size
+            state = self.__state
+
+            inp = self.__inp
+            if msg_size > input_len:
+                if inp is None:
+                    self.__inp = d
+                elif type(self.__inp) is StringType:
+                    self.__inp = [self.__inp, d]
+                else:
+                    self.__inp.append(d)
+                self.__input_len = input_len
+                return # keep waiting for more input
+
+            # load all previous input and d into single string inp
+            if isinstance(inp, StringType):
+                inp = inp + d
+            elif inp is None:
+                inp = d
+            else:
+                inp.append(d)
+                inp = "".join(inp)
+
+            offset = 0
+            while (offset + msg_size) <= input_len:
+                msg = inp[offset:offset + msg_size]
+                offset = offset + msg_size
+                if not state:
+                    # waiting for message
+                    msg_size = struct.unpack(">i", msg)[0]
+                    state = 1
+                else:
+                    msg_size = 4
+                    state = 0
+                    # XXX We call message_input() with __input_lock
+                    # held!!!  And message_input() may end up calling
+                    # message_output(), which has its own lock.  But
+                    # message_output() cannot call message_input(), so
+                    # the locking order is always consistent, which
+                    # prevents deadlock.  Also, message_input() may
+                    # take a long time, because it can cause an
+                    # incoming call to be handled.  During all this
+                    # time, the __input_lock is held.  That's a good
+                    # thing, because it serializes incoming calls.
+                    self.message_input(msg)
+
+            self.__state = state
+            self.__msg_size = msg_size
+            self.__inp = inp[offset:]
+            self.__input_len = input_len - offset
+        finally:
+            self.__input_lock.release()
+
+    def readable(self):
+        return 1
+
+    def writable(self):
+        if len(self.__output) == 0:
+            return 0
+        else:
+            return 1
+
+    def handle_write(self):
+        self.__output_lock.acquire()
+        try:
+            output = self.__output
+            while output:
+                # Accumulate output into a single string so that we avoid
+                # multiple send() calls, but avoid accumulating too much
+                # data.  If we send a very small string and have more data
+                # to send, we will likely incur delays caused by the
+                # unfortunate interaction between the Nagle algorithm and
+                # delayed acks.  If we send a very large string, only a
+                # portion of it will actually be delivered at a time.
+
+                l = 0
+                for i in range(len(output)):
+                    l += len(output[i])
+                    if l > SEND_SIZE:
+                        break
+
+                i += 1
+                # It is very unlikely that i will be 1.
+                v = "".join(output[:i])
+                del output[:i]
+
+                try:
+                    n = self.send(v)
+                except socket.error, err:
+                    if err[0] in expected_socket_write_errors:
+                        break # we couldn't write anything
+                    raise
+                if n < len(v):
+                    output.insert(0, v[n:])
+                    break # we can't write any more
+        finally:
+            self.__output_lock.release()
+
+    def handle_close(self):
+        self.close()
+
+    def message_output(self, message):
+        if __debug__:
+            if self._debug:
+                log.debug('message_output %d bytes: %s',
+                          len(message), log.short_repr(message))
+
+        if self.__closed:
+            raise Disconnected("Action is temporarily unavailable")
+        self.__output_lock.acquire()
+        try:
+            # do two separate appends to avoid copying the message string
+            self.__output.append(struct.pack(">i", len(message)))
+            if len(message) <= SEND_SIZE:
+                self.__output.append(message)
+            else:
+                for i in range(0, len(message), SEND_SIZE):
+                    self.__output.append(message[i:i+SEND_SIZE])
+        finally:
+            self.__output_lock.release()
+
+    def close(self):
+        if not self.__closed:
+            self.__closed = 1
+            self.__super_close()


=== Zope3/src/zodb/zeo/zrpc/trigger.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/zrpc/trigger.py	Wed Dec 25 09:12:23 2002
@@ -0,0 +1,206 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import asyncore
+import os
+import socket
+import thread
+import traceback
+
+if os.name == 'posix':
+
+    class trigger(asyncore.file_dispatcher):
+
+        "Wake up a call to select() running in the main thread"
+
+        # This is useful in a context where you are using Medusa's I/O
+        # subsystem to deliver data, but the data is generated by another
+        # thread.  Normally, if Medusa is in the middle of a call to
+        # select(), new output data generated by another thread will have
+        # to sit until the call to select() either times out or returns.
+        # If the trigger is 'pulled' by another thread, it should immediately
+        # generate a READ event on the trigger object, which will force the
+        # select() invocation to return.
+
+        # A common use for this facility: letting Medusa manage I/O for a
+        # large number of connections; but routing each request through a
+        # thread chosen from a fixed-size thread pool.  When a thread is
+        # acquired, a transaction is performed, but output data is
+        # accumulated into buffers that will be emptied more efficiently
+        # by Medusa. [picture a server that can process database queries
+        # rapidly, but doesn't want to tie up threads waiting to send data
+        # to low-bandwidth connections]
+
+        # The other major feature provided by this class is the ability to
+        # move work back into the main thread: if you call pull_trigger()
+        # with a thunk argument, when select() wakes up and receives the
+        # event it will call your thunk from within that thread.  The main
+        # purpose of this is to remove the need to wrap thread locks around
+        # Medusa's data structures, which normally do not need them.  [To see
+        # why this is true, imagine this scenario: A thread tries to push some
+        # new data onto a channel's outgoing data queue at the same time that
+        # the main thread is trying to remove some]
+
+        def __init__(self):
+            r, w = self._fds = os.pipe()
+            self.trigger = w
+            asyncore.file_dispatcher.__init__(self, r)
+            self.lock = thread.allocate_lock()
+            self.thunks = []
+            self._closed = 0
+
+        # Override the asyncore close() method, because it seems that
+        # it would only close the r file descriptor and not w.  The
+        # constructor calls file_dispatcher.__init__ and passes r,
+        # which would get stored in a file_wrapper and get closed by
+        # the default close.  But that would leave w open...
+
+        def close(self):
+            if not self._closed:
+                self._closed = 1
+                self.del_channel()
+                for fd in self._fds:
+                    os.close(fd)
+                self._fds = []
+
+        def __repr__(self):
+            return '<select-trigger (pipe) at %x>' % id(self)
+
+        def readable(self):
+            return 1
+
+        def writable(self):
+            return 0
+
+        def handle_connect(self):
+            pass
+
+        def handle_close(self):
+            self.close()
+
+        def pull_trigger(self, thunk=None):
+            if thunk:
+                self.lock.acquire()
+                try:
+                    self.thunks.append(thunk)
+                finally:
+                    self.lock.release()
+            os.write(self.trigger, 'x')
+
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except socket.error:
+                return
+            self.lock.acquire()
+            try:
+                for thunk in self.thunks:
+                    try:
+                        thunk()
+                    except:
+                        L = traceback.format_exception(*sys.exc_info())
+                        print 'exception in trigger thunk:\n%s' % "".join(L)
+                self.thunks = []
+            finally:
+                self.lock.release()
+
+else:
+
+    # XXX Should define a base class that has the common methods and
+    # then put the platform-specific in a subclass named trigger.
+
+    # win32-safe version
+
+    HOST = '127.0.0.1'
+    MINPORT = 19950
+    NPORTS = 50
+
+    class trigger(asyncore.dispatcher):
+
+        portoffset = 0
+
+        def __init__(self):
+            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+            # set TCP_NODELAY to true to avoid buffering
+            w.setsockopt(socket.IPPROTO_TCP, 1, 1)
+
+            # tricky: get a pair of connected sockets
+            for i in range(NPORTS):
+                trigger.portoffset = (trigger.portoffset + 1) % NPORTS
+                port = MINPORT + trigger.portoffset
+                address = (HOST, port)
+                try:
+                    a.bind(address)
+                except socket.error:
+                    continue
+                else:
+                    break
+            else:
+                raise RuntimeError, 'Cannot bind trigger!'
+
+            a.listen(1)
+            w.setblocking(0)
+            try:
+                w.connect(address)
+            except:
+                pass
+            r, addr = a.accept()
+            a.close()
+            w.setblocking(1)
+            self.trigger = w
+
+            asyncore.dispatcher.__init__(self, r)
+            self.lock = thread.allocate_lock()
+            self.thunks = []
+            self._trigger_connected = 0
+
+        def __repr__(self):
+            return '<select-trigger (loopback) at %x>' % id(self)
+
+        def readable(self):
+            return 1
+
+        def writable(self):
+            return 0
+
+        def handle_connect(self):
+            pass
+
+        def pull_trigger(self, thunk=None):
+            if thunk:
+                self.lock.acquire()
+                try:
+                    self.thunks.append(thunk)
+                finally:
+                    self.lock.release()
+            self.trigger.send('x')
+
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except socket.error:
+                return
+            self.lock.acquire()
+            try:
+                for thunk in self.thunks:
+                    try:
+                        thunk()
+                    except:
+                        L = traceback.format_exception(*sys.exc_info())
+                        print 'exception in trigger thunk:\n%s' % "".join(L)
+                self.thunks = []
+            finally:
+                self.lock.release()