[Zope-Checkins] CVS: Zope/lib/python/ZEO/zrpc - __init__.py:1.4.2.1 client.py:1.20.2.1 connection.py:1.38.4.1 error.py:1.4.4.1 log.py:1.6.4.1 marshal.py:1.11.4.1 server.py:1.5.8.1 smac.py:1.35.2.1 trigger.py:1.8.4.1

Chris McDonough chrism@zope.com
Tue, 8 Oct 2002 20:41:43 -0400


Update of /cvs-repository/Zope/lib/python/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv15249/ZEO/zrpc

Added Files:
      Tag: chrism-install-branch
	__init__.py client.py connection.py error.py log.py marshal.py 
	server.py smac.py trigger.py 
Log Message:
Committing ZEO to chrism-install-branch.


=== Added File Zope/lib/python/ZEO/zrpc/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger

# zrpc is not an advertised subpackage of ZEO; its interfaces are internal


=== Added File Zope/lib/python/ZEO/zrpc/client.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import errno
import select
import socket
import sys
import threading
import time
import types

import ThreadedAsync
import zLOG

from ZODB.POSException import ReadOnlyError

from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection

class ConnectionManager:
    """Keeps a connection up over time"""

    def __init__(self, addrs, client, tmin=1, tmax=180):
        self.addrlist = self._parse_addrs(addrs)
        self.client = client
        self.tmin = tmin
        self.tmax = tmax
        self.cond = threading.Condition(threading.Lock())
        self.connection = None # Protected by self.cond
        self.closed = 0
        # If thread is not None, then there is a helper thread
        # attempting to connect.
        self.thread = None # Protected by self.cond
        self.trigger = None
        self.thr_async = 0
        ThreadedAsync.register_loop_callback(self.set_async)

    def __repr__(self):
        return "<%s for %s>" % (self.__class__.__name__, self.addrlist)

    def _parse_addrs(self, addrs):
        # Return a list of (addr_type, addr) pairs.

        # For backwards compatibility (and simplicity?) the
        # constructor accepts a single address in the addrs argument --
        # a string for a Unix domain socket or a 2-tuple with a
        # hostname and port.  It can also accept a list of such addresses.

        addr_type = self._guess_type(addrs)
        if addr_type is not None:
            return [(addr_type, addrs)]
        else:
            addrlist = []
            for addr in addrs:
                addr_type = self._guess_type(addr)
                if addr_type is None:
                    raise ValueError, (
                        "unknown address in list: %s" % repr(addr))
                addrlist.append((addr_type, addr))
            return addrlist

    def _guess_type(self, addr):
        if isinstance(addr, types.StringType):
            return socket.AF_UNIX

        if (len(addr) == 2
            and isinstance(addr[0], types.StringType)
            and isinstance(addr[1], types.IntType)):
            return socket.AF_INET

        # not anything I know about
        return None

    def close(self):
        """Prevent ConnectionManager from opening new connections"""
        self.closed = 1
        self.cond.acquire()
        try:
            t = self.thread
            self.thread = None
            conn = self.connection
        finally:
            self.cond.release()
        if t is not None:
            log("CM.close(): stopping and joining thread")
            t.stop()
            t.join(30)
            if t.isAlive():
                log("CM.close(): self.thread.join() timed out",
                    level=zLOG.WARNING)
        if conn is not None:
            # This will call close_conn() below which clears self.connection
            conn.close()
        if self.trigger is not None:
            self.trigger.close()
            self.trigger = None

    def set_async(self, map):
        # This is the callback registered with ThreadedAsync.  The
        # callback might be called multiple times, so it shouldn't
        # create a trigger every time and should never do anything
        # after it's closed.

        # It may be that the only case where it is called multiple
        # times is in the test suite, where ThreadedAsync's loop can
        # be started in a child process after a fork.  Regardless,
        # it's good to be defensive.

        # XXX need each connection started with async==0 to have a
        # callback
        log("CM.set_async(%s)" % repr(map))
        if not self.closed and self.trigger is None:
            log("CM.set_async(): first call")
            self.trigger = trigger()
            self.thr_async = 1 # XXX needs to be set on the Connection

    def attempt_connect(self):
        """Attempt a connection to the server without blocking too long.

        There isn't a crisp definition for too long.  When a
        ClientStorage is created, it attempts to connect to the
        server.  If the server isn't immediately available, it can
        operate from the cache.  This method will start the background
        connection thread and wait a little while to see if it
        finishes quickly.
        """

        # XXX Will a single attempt take too long?
        # XXX Answer: it depends -- normally, you'll connect or get a
        # connection refused error very quickly.  Packet-eating
        # firewalls and other mishaps may cause the connect to take a
        # long time to time out though.  It's also possible that you
        # connect quickly to a slow server, and the attempt includes
        # at least one roundtrip to the server (the register() call).
        # But that's as fast as you can expect it to be.
        self.connect()
        self.cond.acquire()
        try:
            t = self.thread
            conn = self.connection
        finally:
            self.cond.release()
        if t is not None and conn is None:
            event = t.one_attempt
            event.wait()
            self.cond.acquire()
            try:
                conn = self.connection
            finally:
                self.cond.release()
        return conn is not None

    def connect(self, sync=0):
        self.cond.acquire()
        try:
            if self.connection is not None:
                return
            t = self.thread
            if t is None:
                log("CM.connect(): starting ConnectThread")
                self.thread = t = ConnectThread(self, self.client,
                                                self.addrlist,
                                                self.tmin, self.tmax)
                t.start()
            if sync:
                while self.connection is None:
                    self.cond.wait(30)
                    if self.connection is None:
                        log("CM.connect(sync=1): still waiting...")
        finally:
            self.cond.release()
        if sync:
            assert self.connection is not None

    def connect_done(self, conn, preferred):
        # Called by ConnectWrapper.notify_client() after notifying the client
        log("CM.connect_done(preferred=%s)" % preferred)
        self.cond.acquire()
        try:
            self.connection = conn
            if preferred:
                self.thread = None
            self.cond.notifyAll() # Wake up connect(sync=1)
        finally:
            self.cond.release()

    def close_conn(self, conn):
        # Called by the connection when it is closed
        self.cond.acquire()
        try:
            if conn is not self.connection:
                # Closing a non-current connection
                log("CM.close_conn() non-current", level=zLOG.BLATHER)
                return
            log("CM.close_conn()")
            self.connection = None
        finally:
            self.cond.release()
        self.client.notifyDisconnected()
        if not self.closed:
            self.connect()

    def is_connected(self):
        self.cond.acquire()
        try:
            return self.connection is not None
        finally:
            self.cond.release()

# When trying to do a connect on a non-blocking socket, some outcomes
# are expected.  Set _CONNECT_IN_PROGRESS to the errno value(s) expected
# when an initial connect can't complete immediately.  Set _CONNECT_OK
# to the errno value(s) expected if the connect succeeds *or* if it's
# already connected (our code can attempt redundant connects).
if hasattr(errno, "WSAEWOULDBLOCK"):    # Windows
    # XXX The official Winsock docs claim that WSAEALREADY should be
    # treated as yet another "in progress" indicator, but we've never
    # seen this.
    _CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
    # Win98: WSAEISCONN; Win2K: WSAEINVAL
    _CONNECT_OK          = (0, errno.WSAEISCONN, errno.WSAEINVAL)
else:                                   # Unix
    _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
    _CONNECT_OK          = (0, errno.EISCONN)

class ConnectThread(threading.Thread):
    """Thread that tries to connect to server given one or more addresses.

    The thread is passed a ConnectionManager and the manager's client
    as arguments.  It calls testConnection() on the client when a
    socket connects; that should return 1 or 0 indicating whether this
    is a preferred or a fallback connection.  It may also raise an
    exception, in which case the connection is abandoned.

    The thread will continue to run, attempting connections, until a
    preferred connection is seen and successfully handed over to the
    manager and client.

    As soon as testConnection() finds a preferred connection, or after
    all sockets have been tried and at least one fallback connection
    has been seen, notifyConnected(connection) is called on the client
    and connect_done() on the manager.  If this was a preferred
    connection, the thread then exits; otherwise, it keeps trying
    until it gets a preferred connection, and then reconnects the
    client using that connection.

    """

    __super_init = threading.Thread.__init__

    # We don't expect clients to call any methods of this Thread other
    # than close() and those defined by the Thread API.

    def __init__(self, mgr, client, addrlist, tmin, tmax):
        self.__super_init(name="Connect(%s)" % addrlist)
        self.mgr = mgr
        self.client = client
        self.addrlist = addrlist
        self.tmin = tmin
        self.tmax = tmax
        self.stopped = 0
        self.one_attempt = threading.Event()
        # A ConnectThread keeps track of whether it has finished a
        # call to try_connecting().  This allows the ConnectionManager
        # to make an attempt to connect right away, but not block for
        # too long if the server isn't immediately available.

    def stop(self):
        self.stopped = 1

    def run(self):
        delay = self.tmin
        success = 0
        while not self.stopped:
            success = self.try_connecting()
            if not self.one_attempt.isSet():
                self.one_attempt.set()
            if success > 0:
                break
            time.sleep(delay)
            delay = min(delay*2, self.tmax)
        log("CT: exiting thread: %s" % self.getName())

    def try_connecting(self):
        """Try connecting to all self.addrlist addresses.

        Return 1 if a preferred connection was found; 0 if no
        connection was found; and -1 if a fallback connection was
        found.
        """

        log("CT: attempting to connect on %d sockets" % len(self.addrlist))

        # Create socket wrappers
        wrappers = {}  # keys are active wrappers
        for domain, addr in self.addrlist:
            wrap = ConnectWrapper(domain, addr, self.mgr, self.client)
            wrap.connect_procedure()
            if wrap.state == "notified":
                for wrap in wrappers.keys():
                    wrap.close()
                return 1
            if wrap.state != "closed":
                wrappers[wrap] = wrap

        # Next wait until they all actually connect (or fail)
        # XXX If a sockets never connects, nor fails, we'd wait forever!
        while wrappers:
            if self.stopped:
                for wrap in wrappers.keys():
                    wrap.close()
                return 0
            # Select connecting wrappers
            connecting = [wrap
                          for wrap in wrappers.keys()
                          if wrap.state == "connecting"]
            if not connecting:
                break
            try:
                r, w, x = select.select([], connecting, connecting, 1.0)
            except select.error, msg:
                log("CT: select failed; msg=%s" % str(msg),
                    level=zLOG.WARNING) # XXX Is this the right level?
                continue
            # Exceptable wrappers are in trouble; close these suckers
            for wrap in x:
                log("CT: closing troubled socket %s" % str(wrap.addr))
                del wrappers[wrap]
                wrap.close()
            # Writable sockets are connected
            for wrap in w:
                wrap.connect_procedure()
                if wrap.state == "notified":
                    del wrappers[wrap] # Don't close this one
                    for wrap in wrappers.keys():
                        wrap.close()
                    return 1
                if wrap.state == "closed":
                    del wrappers[wrap]

        # If we've got wrappers left at this point, they're fallback
        # connections.  Try notifying them until one succeeds.
        for wrap in wrappers.keys():
            assert wrap.state == "tested" and wrap.preferred == 0
            if self.mgr.is_connected():
                wrap.close()
            else:
                wrap.notify_client()
                if wrap.state == "notified":
                    del wrappers[wrap] # Don't close this one
                    for wrap in wrappers.keys():
                        wrap.close()
                    return -1
            assert wrap.state == "closed"
            del wrappers[wrap]

        # Alas, no luck.
        assert not wrappers
        return 0

class ConnectWrapper:
    """An object that handles the connection procedure for one socket.

    This is a little state machine with states:
        closed
        opened
        connecting
        connected
        tested
        notified
    """

    def __init__(self, domain, addr, mgr, client):
        """Store arguments and create non-blocking socket."""
        self.domain = domain
        self.addr = addr
        self.mgr = mgr
        self.client = client
        # These attributes are part of the interface
        self.state = "closed"
        self.sock = None
        self.conn = None
        self.preferred = 0
        log("CW: attempt to connect to %s" % repr(addr))
        try:
            self.sock = socket.socket(domain, socket.SOCK_STREAM)
        except socket.error, err:
            log("CW: can't create socket, domain=%s: %s" % (domain, err),
                level=zLOG.ERROR)
            self.close()
            return
        self.sock.setblocking(0)
        self.state = "opened"

    def connect_procedure(self):
        """Call sock.connect_ex(addr) and interpret result."""
        if self.state in ("opened", "connecting"):
            try:
                err = self.sock.connect_ex(self.addr)
            except socket.error, msg:
                log("CW: connect_ex(%r) failed: %s" % (self.addr, msg),
                    level=zLOG.ERROR)
                self.close()
                return
            log("CW: connect_ex(%s) returned %s" %
                (self.addr, errno.errorcode.get(err) or str(err)))
            if err in _CONNECT_IN_PROGRESS:
                self.state = "connecting"
                return
            if err not in _CONNECT_OK:
                log("CW: error connecting to %s: %s" %
                    (self.addr, errno.errorcode.get(err) or str(err)),
                    level=zLOG.WARNING)
                self.close()
                return
            self.state = "connected"
        if self.state == "connected":
            self.test_connection()

    def test_connection(self):
        """Establish and test a connection at the zrpc level.

        Call the client's testConnection(), giving the client a chance
        to do app-level check of the connection.
        """
        self.conn = ManagedConnection(self.sock, self.addr,
                                      self.client, self.mgr)
        self.sock = None # The socket is now owned by the connection
        try:
            self.preferred = self.client.testConnection(self.conn)
            self.state = "tested"
        except ReadOnlyError:
            log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr))
            self.close()
            return
        except:
            log("CW: error in testConnection (%s)" % repr(self.addr),
                level=zLOG.ERROR, error=sys.exc_info())
            self.close()
            return
        if self.preferred:
            self.notify_client()

    def notify_client(self):
        """Call the client's notifyConnected().

        If this succeeds, call the manager's connect_done().

        If the client is already connected, we assume it's a fallback
        connection, and the new connection must be a preferred
        connection.  The client will close the old connection.
        """
        try:
            self.client.notifyConnected(self.conn)
        except:
            log("CW: error in notifyConnected (%s)" % repr(self.addr),
                level=zLOG.ERROR, error=sys.exc_info())
            self.close()
            return
        self.state = "notified"
        self.mgr.connect_done(self.conn, self.preferred)

    def close(self):
        """Close the socket and reset everything."""
        self.state = "closed"
        self.mgr = self.client = None
        self.preferred = 0
        if self.conn is not None:
            # Closing the ZRPC connection will eventually close the
            # socket, somewhere in asyncore.
            # XXX Why do we care? --Guido
            self.conn.close()
            self.conn = None
        if self.sock is not None:
            self.sock.close()
            self.sock = None

    def fileno(self):
        return self.sock.fileno()


=== Added File Zope/lib/python/ZEO/zrpc/connection.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import errno
import select
import sys
import threading
import types

import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.log import log, short_repr
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB import POSException

REPLY = ".reply" # message name used for replies
ASYNC = 1

class Delay:
    """Used to delay response to client for synchronous calls

    When a synchronous call is made and the original handler returns
    without handling the call, it returns a Delay object that prevents
    the mainloop from sending a response.
    """

    def set_sender(self, msgid, send_reply, return_error):
        self.msgid = msgid
        self.send_reply = send_reply
        self.return_error = return_error

    def reply(self, obj):
        self.send_reply(self.msgid, obj)

    def error(self, exc_info):
        log("Error raised in delayed method", zLOG.ERROR, error=exc_info)
        self.return_error(self.msgid, 0, *exc_info[:2])

class MTDelay(Delay):

    def __init__(self):
        self.ready = threading.Event()

    def set_sender(self, msgid, send_reply, return_error):
        Delay.set_sender(self, msgid, send_reply, return_error)
        self.ready.set()

    def reply(self, obj):
        self.ready.wait()
        Delay.reply(self, obj)

    def error(self, exc_info):
        self.ready.wait()
        Delay.error(self, exc_info)

class Connection(smac.SizedMessageAsyncConnection):
    """Dispatcher for RPC on object on both sides of socket.

    The connection supports synchronous calls, which expect a return,
    and asynchronous calls, which do not.

    It uses the Marshaller class to handle encoding and decoding of
    method calls and arguments.  Marshaller uses pickle to encode
    arbitrary Python objects.  The code here doesn't ever see the wire
    format.

    A Connection is designed for use in a multithreaded application,
    where a synchronous call must block until a response is ready.

    A socket connection between a client and a server allows either
    side to invoke methods on the other side.  The processes on each
    end of the socket use a Connection object to manage communication.

    The Connection deals with decoded RPC messages.  They are
    represented as four-tuples containing: msgid, flags, method name,
    and a tuple of method arguments.

    The msgid starts at zero and is incremented by one each time a
    method call message is sent.  Each side of the connection has a
    separate msgid state.

    When one side of the connection (the client) calls a method, it
    sends a message with a new msgid.  The other side (the server),
    replies with a message that has the same msgid, the string
    ".reply" (the global variable REPLY) as the method name, and the
    actual return value in the args position.  Note that each side of
    the Connection can initiate a call, in which case it will be the
    client for that particular call.

    The protocol also supports asynchronous calls.  The client does
    not wait for a return value for an asynchronous call.  The only
    defined flag is ASYNC.  If a method call message has the ASYNC
    flag set, the server will raise an exception.

    If a method call raises an Exception, the exception is propagated
    back to the client via the REPLY message.  The client side will
    raise any exception it receives instead of returning the value to
    the caller.
    """

    __super_init = smac.SizedMessageAsyncConnection.__init__
    __super_close = smac.SizedMessageAsyncConnection.close

    protocol_version = "Z200"

    def __init__(self, sock, addr, obj=None):
        self.obj = None
        self.marshal = Marshaller()
        self.closed = 0
        self.msgid = 0
        self.__super_init(sock, addr)
        # A Connection either uses asyncore directly or relies on an
        # asyncore mainloop running in a separate thread.  If
        # thr_async is true, then the mainloop is running in a
        # separate thread.  If thr_async is true, then the asyncore
        # trigger (self.trigger) is used to notify that thread of
        # activity on the current thread.
        self.thr_async = 0
        self.trigger = None
        self._prepare_async()
        self._map = {self._fileno: self}
        # __msgid_lock guards access to msgid
        self.msgid_lock = threading.Lock()
        # __replies_cond is used to block when a synchronous call is
        # waiting for a response
        self.replies_cond = threading.Condition()
        self.replies = {}
        self.register_object(obj)
        self.handshake()

    def __repr__(self):
        return "<%s %s>" % (self.__class__.__name__, self.addr)

    __str__ = __repr__ # Defeat asyncore's dreaded __getattr__

    def close(self):
        if self.closed:
            return
        self._map.clear()
        self.closed = 1
        self.close_trigger()
        self.__super_close()

    def close_trigger(self):
        # overridden by ManagedConnection
        if self.trigger is not None:
            self.trigger.close()

    def register_object(self, obj):
        """Register obj as the true object to invoke methods on"""
        self.obj = obj

    def handshake(self):
        # When a connection is created the first message sent is a
        # 4-byte protocol version.  This mechanism should allow the
        # protocol to evolve over time, and let servers handle clients
        # using multiple versions of the protocol.

        # The mechanism replaces the message_input() method for the
        # first message received.

        # The client sends the protocol version it is using.
        self._message_input = self.message_input
        self.message_input = self.recv_handshake
        self.message_output(self.protocol_version)

    def recv_handshake(self, message):
        if message == self.protocol_version:
            self.message_input = self._message_input
        else:
            log("recv_handshake: bad handshake %s" % short_repr(message),
                level=zLOG.ERROR)
        # otherwise do something else...

    def message_input(self, message):
        """Decoding an incoming message and dispatch it"""
        # If something goes wrong during decoding, the marshaller
        # will raise an exception.  The exception will ultimately
        # result in asycnore calling handle_error(), which will
        # close the connection.
        msgid, flags, name, args = self.marshal.decode(message)

        if __debug__:
            log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
                                              short_repr(args)),
                level=zLOG.TRACE)
        if name == REPLY:
            self.handle_reply(msgid, flags, args)
        else:
            self.handle_request(msgid, flags, name, args)

    def handle_reply(self, msgid, flags, args):
        if __debug__:
            log("recv reply: %s, %s, %s" % (msgid, flags, short_repr(args)),
                level=zLOG.DEBUG)
        self.replies_cond.acquire()
        try:
            self.replies[msgid] = flags, args
            self.replies_cond.notifyAll()
        finally:
            self.replies_cond.release()

    def handle_request(self, msgid, flags, name, args):
        if not self.check_method(name):
            msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
            raise ZRPCError(msg)
        if __debug__:
            log("calling %s%s" % (name, short_repr(args)), level=zLOG.BLATHER)

        meth = getattr(self.obj, name)
        try:
            ret = meth(*args)
        except (SystemExit, KeyboardInterrupt):
            raise
        except Exception, msg:
            error = sys.exc_info()
            log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
                error=error)
            error = error[:2]
            return self.return_error(msgid, flags, *error)

        if flags & ASYNC:
            if ret is not None:
                raise ZRPCError("async method %s returned value %s" %
                                (name, short_repr(ret)))
        else:
            if __debug__:
                log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
            if isinstance(ret, Delay):
                ret.set_sender(msgid, self.send_reply, self.return_error)
            else:
                self.send_reply(msgid, ret)

    def handle_error(self):
        if sys.exc_info()[0] == SystemExit:
            raise sys.exc_info()
        self.log_error("Error caught in asyncore")
        self.close()

    def log_error(self, msg="No error message supplied"):
        log(msg, zLOG.ERROR, error=sys.exc_info())

    def check_method(self, name):
        # XXX Is this sufficient "security" for now?
        if name.startswith('_'):
            return None
        return hasattr(self.obj, name)

    def send_reply(self, msgid, ret):
        try:
            msg = self.marshal.encode(msgid, 0, REPLY, ret)
        except self.marshal.errors:
            try:
                r = short_repr(ret)
            except:
                r = "<unreprable>"
            err = ZRPCError("Couldn't pickle return %.100s" % r)
            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
        self.message_output(msg)
        self.poll()

    def return_error(self, msgid, flags, err_type, err_value):
        if flags & ASYNC:
            self.log_error("Asynchronous call raised exception: %s" % self)
            return
        if type(err_value) is not types.InstanceType:
            err_value = err_type, err_value

        try:
            msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
        except self.marshal.errors:
            try:
                r = short_repr(err_value)
            except:
                r = "<unreprable>"
            err = ZRPCError("Couldn't pickle error %.100s" % r)
            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
        self.message_output(msg)
        self.poll()

    # The next two public methods (call and callAsync) are used by
    # clients to invoke methods on remote objects

    def send_call(self, method, args, flags):
        # send a message and return its msgid
        self.msgid_lock.acquire()
        try:
            msgid = self.msgid
            self.msgid = self.msgid + 1
        finally:
            self.msgid_lock.release()
        if __debug__:
            log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
                zLOG.TRACE)
        buf = self.marshal.encode(msgid, flags, method, args)
        self.message_output(buf)
        return msgid

    def call(self, method, *args):
        if self.closed:
            raise DisconnectedError()
        msgid = self.send_call(method, args, 0)
        r_flags, r_args = self.wait(msgid)
        if (isinstance(r_args, types.TupleType)
            and type(r_args[0]) == types.ClassType
            and issubclass(r_args[0], Exception)):
            inst = r_args[1]
            raise inst # error raised by server
        else:
            return r_args

    def callAsync(self, method, *args):
        if self.closed:
            raise DisconnectedError()
        self.send_call(method, args, ASYNC)
        self.poll()

    # handle IO, possibly in async mode

    def _prepare_async(self):
        self.thr_async = 0
        ThreadedAsync.register_loop_callback(self.set_async)
        # XXX If we are not in async mode, this will cause dead
        # Connections to be leaked.

    def set_async(self, map):
        self.trigger = trigger()
        self.thr_async = 1

    def is_async(self):
        # overridden for ManagedConnection
        if self.thr_async:
            return 1
        else:
            return 0

    def wait(self, msgid):
        """Invoke asyncore mainloop and wait for reply."""
        if __debug__:
            log("wait(%d), async=%d" % (msgid, self.is_async()),
                level=zLOG.TRACE)
        if self.is_async():
            self.trigger.pull_trigger()

        # Delay used when we call asyncore.poll() directly.
        # Start with a 1 msec delay, double until 1 sec.
        delay = 0.001

        self.replies_cond.acquire()
        try:
            while 1:
                if self.closed:
                    raise DisconnectedError()
                reply = self.replies.get(msgid)
                if reply is not None:
                    del self.replies[msgid]
                    if __debug__:
                        log("wait(%d): reply=%s" % (msgid, short_repr(reply)),
                            level=zLOG.DEBUG)
                    return reply
                if self.is_async():
                    self.replies_cond.wait(10.0)
                else:
                    self.replies_cond.release()
                    try:
                        try:
                            if __debug__:
                                log("wait(%d): asyncore.poll(%s)" %
                                    (msgid, delay), level=zLOG.TRACE)
                            asyncore.poll(delay, self._map)
                            if delay < 1.0:
                                delay += delay
                        except select.error, err:
                            log("Closing.  asyncore.poll() raised %s." % err,
                                level=zLOG.BLATHER)
                            self.close()
                    finally:
                        self.replies_cond.acquire()
        finally:
            self.replies_cond.release()

    def poll(self):
        """Invoke asyncore mainloop to get pending message out."""
        if __debug__:
            log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
        if self.is_async():
            self.trigger.pull_trigger()
        else:
            asyncore.poll(0.0, self._map)

    def pending(self):
        """Invoke mainloop until any pending messages are handled."""
        if __debug__:
            log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
        if self.is_async():
            return
        # Inline the asyncore poll() function to know whether any input
        # was actually read.  Repeat until no input is ready.
        # XXX This only does reads.
        r_in = [self._fileno]
        w_in = []
        x_in = []
        while 1:
            try:
                r, w, x = select.select(r_in, w_in, x_in, 0)
            except select.error, err:
                if err[0] == errno.EINTR:
                    continue
                else:
                    raise
            if not r:
                break
            try:
                self.handle_read_event()
            except asyncore.ExitNow:
                raise
            except:
                self.handle_error()

class ManagedServerConnection(Connection):
    """Server-side Connection subclass."""
    __super_init = Connection.__init__
    __super_close = Connection.close

    def __init__(self, sock, addr, obj, mgr):
        self.mgr = mgr
        self.__super_init(sock, addr, obj)
        self.obj.notifyConnected(self)

    def close(self):
        self.obj.notifyDisconnected()
        self.mgr.close_conn(self)
        self.__super_close()

class ManagedConnection(Connection):
    """Client-side Connection subclass."""
    __super_init = Connection.__init__
    __super_close = Connection.close

    def __init__(self, sock, addr, obj, mgr):
        self.mgr = mgr
        self.__super_init(sock, addr, obj)
        self.check_mgr_async()

    # Defer the ThreadedAsync work to the manager.

    def close_trigger(self):
        # the manager should actually close the trigger
        del self.trigger

    def set_async(self, map):
        pass

    def _prepare_async(self):
        # Don't do the register_loop_callback that the superclass does
        pass

    def check_mgr_async(self):
        if not self.thr_async and self.mgr.thr_async:
            assert self.mgr.trigger is not None, \
                   "manager (%s) has no trigger" % self.mgr
            self.thr_async = 1
            self.trigger = self.mgr.trigger
            return 1
        return 0

    def is_async(self):
        # XXX could the check_mgr_async() be avoided on each test?
        if self.thr_async:
            return 1
        return self.check_mgr_async()

    def close(self):
        self.mgr.close_conn(self)
        self.__super_close()


=== Added File Zope/lib/python/ZEO/zrpc/error.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import Disconnected

class ZRPCError(POSException.StorageError):
    pass

class DisconnectedError(ZRPCError, Disconnected):
    """The database storage is disconnected from the storage server."""


=== Added File Zope/lib/python/ZEO/zrpc/log.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import types
import zLOG
import threading

LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging

_label = "zrpc:%s" % os.getpid()

def new_label():
    global _label
    _label = "zrpc:%s" % os.getpid()

def log(message, level=zLOG.BLATHER, label=None, error=None):
    label = label or _label
    if LOG_THREAD_ID:
        label = "%s:%s" % (label, threading.currentThread().getName())
    zLOG.LOG(label, level, message, error=error)

REPR_LIMIT = 40

def short_repr(obj):
    "Return an object repr limited to REPR_LIMIT bytes."

    # Some of the objects being repr'd are large strings.  It's wastes
    # a lot of memory to repr them and then truncate, so special case
    # them in this function.
    # Also handle short repr of a tuple containing a long string.

    # This strategy works well for arguments to StorageServer methods.
    # The oid is usually first and will get included in its entirety.
    # The pickle is near the beginning, too, and you can often fit the
    # module name in the pickle.

    if isinstance(obj, types.StringType):
        if len(obj) > REPR_LIMIT:
            r = repr(obj[:REPR_LIMIT])
        else:
            r = repr(obj)
        if len(r) > REPR_LIMIT:
            r = r[:REPR_LIMIT-4] + '...' + r[-1]
        return r
    elif isinstance(obj, types.TupleType):
        elts = []
        size = 0
        for elt in obj:
            r = repr(elt)
            elts.append(r)
            size += len(r)
            if size > REPR_LIMIT:
                break
        r = "(%s)" % (", ".join(elts))
    else:
        r = repr(obj)
    if len(r) > REPR_LIMIT:
        return r[:REPR_LIMIT] + '...'
    else:
        return r


=== Added File Zope/lib/python/ZEO/zrpc/marshal.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import cPickle
from cStringIO import StringIO
import types

import zLOG

from ZEO.zrpc.error import ZRPCError
from ZEO.zrpc.log import log, short_repr

class Marshaller:
    """Marshal requests and replies to second across network"""

    def encode(self, msgid, flags, name, args):
        """Returns an encoded message"""
        # (We used to have a global pickler, but that's not thread-safe. :-( )
        pickler = cPickle.Pickler()
        pickler.fast = 1
        return pickler.dump((msgid, flags, name, args), 1)

    def decode(self, msg):
        """Decodes msg and returns its parts"""
        unpickler = cPickle.Unpickler(StringIO(msg))
        unpickler.find_global = find_global

        try:
            return unpickler.load() # msgid, flags, name, args
        except:
            log("can't decode message: %s" % short_repr(msg), level=zLOG.ERROR)
            raise

_globals = globals()
_silly = ('__doc__',)

def find_global(module, name):
    """Helper for message unpickler"""
    try:
        m = __import__(module, _globals, _globals, _silly)
    except ImportError, msg:
        raise ZRPCError("import error %s: %s" % (module, msg))

    try:
        r = getattr(m, name)
    except AttributeError:
        raise ZRPCError("module %s has no global %s" % (module, name))

    safe = getattr(r, '__no_side_effects__', 0)
    if safe:
        return r

    # XXX what's a better way to do this?  esp w/ 2.1 & 2.2
    if type(r) == types.ClassType and issubclass(r, Exception):
        return r

    raise ZRPCError("Unsafe global: %s.%s" % (module, name))


=== Added File Zope/lib/python/ZEO/zrpc/server.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import socket
import types

from ZEO.zrpc.connection import Connection, Delay
from ZEO.zrpc.log import log

# Export the main asyncore loop
loop = asyncore.loop

class Dispatcher(asyncore.dispatcher):
    """A server that accepts incoming RPC connections"""
    __super_init = asyncore.dispatcher.__init__

    reuse_addr = 1

    def __init__(self, addr, factory=Connection, reuse_addr=None):
        self.__super_init()
        self.addr = addr
        self.factory = factory
        self.clients = []
        if reuse_addr is not None:
            self.reuse_addr = reuse_addr
        self._open_socket()

    def _open_socket(self):
        if type(self.addr) == types.TupleType:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.set_reuse_addr()
        log("listening on %s" % str(self.addr))
        self.bind(self.addr)
        self.listen(5)

    def writable(self):
        return 0

    def readable(self):
        return 1

    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error, msg:
            log("accepted failed: %s" % msg)
            return
        c = self.factory(sock, addr)
        log("connect from %s: %s" % (repr(addr), c))
        self.clients.append(c)


=== Added File Zope/lib/python/ZEO/zrpc/smac.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Sized Message Async Connections."""

import asyncore, struct
import threading
from ZEO.Exceptions import Disconnected
import zLOG
from types import StringType

from ZEO.zrpc.log import log, short_repr

import socket, errno

# Use the dictionary to make sure we get the minimum number of errno
# entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
# or that only one is actually used.

tmp_dict = {errno.EWOULDBLOCK: 0,
            errno.EAGAIN: 0,
            errno.EINTR: 0,
            }
expected_socket_read_errors = tuple(tmp_dict.keys())

tmp_dict = {errno.EAGAIN: 0,
            errno.EWOULDBLOCK: 0,
            errno.ENOBUFS: 0,
            errno.EINTR: 0,
            }
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict

# We chose 60000 as the socket limit by looking at the largest strings
# that we could pass to send() without blocking.
SEND_SIZE = 60000

class SizedMessageAsyncConnection(asyncore.dispatcher):
    __super_init = asyncore.dispatcher.__init__
    __super_close = asyncore.dispatcher.close

    __closed = 1 # Marker indicating that we're closed

    socket = None # to outwit Sam's getattr

    def __init__(self, sock, addr, map=None, debug=None):
        self.addr = addr
        if debug is not None:
            self._debug = debug
        elif not hasattr(self, '_debug'):
            self._debug = __debug__
        # __input_lock protects __inp, __input_len, __state, __msg_size
        self.__input_lock = threading.Lock()
        self.__inp = None # None, a single String, or a list
        self.__input_len = 0
        # Instance variables __state and __msg_size work together:
        #   when __state == 0:
        #     __msg_size == 4, and the next thing read is a message size;
        #   when __state == 1:
        #     __msg_size is variable, and the next thing read is a message.
        # The next thing read is always of length __msg_size.
        # The state alternates between 0 and 1.
        self.__state = 0
        self.__msg_size = 4
        self.__output_lock = threading.Lock() # Protects __output
        self.__output = []
        self.__closed = 0
        self.__super_init(sock, map)

    # XXX avoid expensive getattr calls?  Can't remember exactly what
    # this comment was supposed to mean, but it has something to do
    # with the way asyncore uses getattr and uses if sock:
    def __nonzero__(self):
        return 1

    def handle_read(self):
        self.__input_lock.acquire()
        try:
            # Use a single __inp buffer and integer indexes to make this fast.
            try:
                d = self.recv(8192)
            except socket.error, err:
                if err[0] in expected_socket_read_errors:
                    return
                raise
            if not d:
                return

            input_len = self.__input_len + len(d)
            msg_size = self.__msg_size
            state = self.__state

            inp = self.__inp
            if msg_size > input_len:
                if inp is None:
                    self.__inp = d
                elif type(self.__inp) is StringType:
                    self.__inp = [self.__inp, d]
                else:
                    self.__inp.append(d)
                self.__input_len = input_len
                return # keep waiting for more input

            # load all previous input and d into single string inp
            if isinstance(inp, StringType):
                inp = inp + d
            elif inp is None:
                inp = d
            else:
                inp.append(d)
                inp = "".join(inp)

            offset = 0
            while (offset + msg_size) <= input_len:
                msg = inp[offset:offset + msg_size]
                offset = offset + msg_size
                if not state:
                    # waiting for message
                    msg_size = struct.unpack(">i", msg)[0]
                    state = 1
                else:
                    msg_size = 4
                    state = 0
                    # XXX We call message_input() with __input_lock
                    # held!!!  And message_input() may end up calling
                    # message_output(), which has its own lock.  But
                    # message_output() cannot call message_input(), so
                    # the locking order is always consistent, which
                    # prevents deadlock.  Also, message_input() may
                    # take a long time, because it can cause an
                    # incoming call to be handled.  During all this
                    # time, the __input_lock is held.  That's a good
                    # thing, because it serializes incoming calls.
                    self.message_input(msg)

            self.__state = state
            self.__msg_size = msg_size
            self.__inp = inp[offset:]
            self.__input_len = input_len - offset
        finally:
            self.__input_lock.release()

    def readable(self):
        return 1

    def writable(self):
        if len(self.__output) == 0:
            return 0
        else:
            return 1

    def handle_write(self):
        self.__output_lock.acquire()
        try:
            output = self.__output
            while output:
                # Accumulate output into a single string so that we avoid
                # multiple send() calls, but avoid accumulating too much
                # data.  If we send a very small string and have more data
                # to send, we will likely incur delays caused by the
                # unfortunate interaction between the Nagle algorithm and
                # delayed acks.  If we send a very large string, only a
                # portion of it will actually be delivered at a time.

                l = 0
                for i in range(len(output)):
                    l += len(output[i])
                    if l > SEND_SIZE:
                        break

                i += 1
                # It is very unlikely that i will be 1.
                v = "".join(output[:i])
                del output[:i]

                try:
                    n = self.send(v)
                except socket.error, err:
                    if err[0] in expected_socket_write_errors:
                        break # we couldn't write anything
                    raise
                if n < len(v):
                    output.insert(0, v[n:])
                    break # we can't write any more
        finally:
            self.__output_lock.release()

    def handle_close(self):
        self.close()

    def message_output(self, message):
        if __debug__:
            if self._debug:
                log('message_output %d bytes: %s' %
                    (len(message), short_repr(message)),
                    level=zLOG.TRACE)

        if self.__closed:
            raise Disconnected, (
                "This action is temporarily unavailable."
                "<p>"
                )
        self.__output_lock.acquire()
        try:
            # do two separate appends to avoid copying the message string
            self.__output.append(struct.pack(">i", len(message)))
            if len(message) <= SEND_SIZE:
                self.__output.append(message)
            else:
                for i in range(0, len(message), SEND_SIZE):
                    self.__output.append(message[i:i+SEND_SIZE])
        finally:
            self.__output_lock.release()

    def close(self):
        if not self.__closed:
            self.__closed = 1
            self.__super_close()


=== Added File Zope/lib/python/ZEO/zrpc/trigger.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore

import os
import socket
import string
import thread

if os.name == 'posix':

    class trigger(asyncore.file_dispatcher):

        "Wake up a call to select() running in the main thread"

        # This is useful in a context where you are using Medusa's I/O
        # subsystem to deliver data, but the data is generated by another
        # thread.  Normally, if Medusa is in the middle of a call to
        # select(), new output data generated by another thread will have
        # to sit until the call to select() either times out or returns.
        # If the trigger is 'pulled' by another thread, it should immediately
        # generate a READ event on the trigger object, which will force the
        # select() invocation to return.

        # A common use for this facility: letting Medusa manage I/O for a
        # large number of connections; but routing each request through a
        # thread chosen from a fixed-size thread pool.  When a thread is
        # acquired, a transaction is performed, but output data is
        # accumulated into buffers that will be emptied more efficiently
        # by Medusa. [picture a server that can process database queries
        # rapidly, but doesn't want to tie up threads waiting to send data
        # to low-bandwidth connections]

        # The other major feature provided by this class is the ability to
        # move work back into the main thread: if you call pull_trigger()
        # with a thunk argument, when select() wakes up and receives the
        # event it will call your thunk from within that thread.  The main
        # purpose of this is to remove the need to wrap thread locks around
        # Medusa's data structures, which normally do not need them.  [To see
        # why this is true, imagine this scenario: A thread tries to push some
        # new data onto a channel's outgoing data queue at the same time that
        # the main thread is trying to remove some]

        def __init__(self):
            r, w = self._fds = os.pipe()
            self.trigger = w
            asyncore.file_dispatcher.__init__(self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._closed = 0

        # Override the asyncore close() method, because it seems that
        # it would only close the r file descriptor and not w.  The
        # constructor calls file_dispactcher.__init__ and passes r,
        # which would get stored in a file_wrapper and get closed by
        # the default close.  But that would leave w open...

        def close(self):
            if not self._closed:
                self._closed = 1
                self.del_channel()
                for fd in self._fds:
                    os.close(fd)

        def __repr__(self):
            return '<select-trigger (pipe) at %x>' % id(self)

        def readable(self):
            return 1

        def writable(self):
            return 0

        def handle_connect(self):
            pass

        def pull_trigger(self, thunk=None):
            if thunk:
                self.lock.acquire()
                try:
                    self.thunks.append(thunk)
                finally:
                    self.lock.release()
            os.write(self.trigger, 'x')

        def handle_read(self):
            self.recv(8192)
            self.lock.acquire()
            try:
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        nil, t, v, tbinfo = asyncore.compact_traceback()
                        print ('exception in trigger thunk:'
                               ' (%s:%s %s)' % (t, v, tbinfo))
                self.thunks = []
            finally:
                self.lock.release()

else:

    # XXX Should define a base class that has the common methods and
    # then put the platform-specific in a subclass named trigger.

    # win32-safe version

    HOST = '127.0.0.1'
    MINPORT = 19950
    NPORTS = 50

    class trigger(asyncore.dispatcher):

        portoffset = 0

        def __init__(self):
            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

            # set TCP_NODELAY to true to avoid buffering
            w.setsockopt(socket.IPPROTO_TCP, 1, 1)

            # tricky: get a pair of connected sockets
            for i in range(NPORTS):
                trigger.portoffset = (trigger.portoffset + 1) % NPORTS
                port = MINPORT + trigger.portoffset
                address = (HOST, port)
                try:
                    a.bind(address)
                except socket.error:
                    continue
                else:
                    break
            else:
                raise RuntimeError, 'Cannot bind trigger!'

            a.listen(1)
            w.setblocking(0)
            try:
                w.connect(address)
            except:
                pass
            r, addr = a.accept()
            a.close()
            w.setblocking(1)
            self.trigger = w

            asyncore.dispatcher.__init__(self, r)
            self.lock = thread.allocate_lock()
            self.thunks = []
            self._trigger_connected = 0

        def __repr__(self):
            return '<select-trigger (loopback) at %x>' % id(self)

        def readable(self):
            return 1

        def writable(self):
            return 0

        def handle_connect(self):
            pass

        def pull_trigger(self, thunk=None):
            if thunk:
                self.lock.acquire()
                try:
                    self.thunks.append(thunk)
                finally:
                    self.lock.release()
            self.trigger.send('x')

        def handle_read(self):
            self.recv(8192)
            self.lock.acquire()
            try:
                for thunk in self.thunks:
                    try:
                        thunk()
                    except:
                        nil, t, v, tbinfo = asyncore.compact_traceback()
                        print ('exception in trigger thunk:'
                               ' (%s:%s %s)' % (t, v, tbinfo))
                self.thunks = []
            finally:
                self.lock.release()