[Zodb-checkins] CVS: ZEO/ZEO/zrpc - NOTES:1.2 __init__.py:1.2 client.py:1.2 connection.py:1.2 error.py:1.2 log.py:1.2 marshal.py:1.2 server.py:1.2 trigger.py:1.2

Jeremy Hylton jeremy@zope.com
Tue, 11 Jun 2002 15:22:27 -0400


Update of /cvs-repository/ZEO/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv15241/zrpc

Added Files:
	NOTES __init__.py client.py connection.py error.py log.py 
	marshal.py server.py trigger.py 
Log Message:
Merge ZEO2-branch to trunk.  (Files added on branch.)



=== ZEO/ZEO/zrpc/NOTES 1.1 => 1.2 ===
+handling for outstanding calls.  In particular, it should be possible
+to have multiple calls with return values outstanding.
+
+The mechanism described here is based on the promises mechanism in
+Argus, which was influenced by futures in Multilisp.
+
+    Promises: Linguistic Support for Efficient Asynchronous Procedure
+    Calls in Distributed Systems.  Barbara Liskov and Liuba Shrira.
+    Proc. of Conf. on Programming Language Design and Implementation
+    (PLDI), June 1988.
+
+We want to support two different kinds of calls:
+
+  - send : invoke a method that returns no value
+  - call : invoke a method that returns a value
+
+On the client, a call immediately returns a promise.  A promise is an
+object that can be used to claim the return value when it becomes
+available. 
+
+  - ready(): returns true if the return value is ready or an exception
+             occurred
+  - claim(): returns the call's return value or raises an exception,
+             blocking if necessary
+
+The server side of a zrpc connection can be implemented using
+asyncore.  In that case, a method call blocks other RPC activity until
+it returns.  If a call needs to return a value, but can't return
+immediately, it returns a delay object (ZEO.zrpc.server.Delay).  
+
+When the zrpc connection receives a Delay object, it does not
+immediately return to the caller.  Instead, it returns when the
+reply() method is called.  A Delay has two methods:
+
+  - set_sender()
+  - reply(obj): returns obj to the sender
+
+-----------------------------------------
+
+Open issues:
+
+Delayed exception
+
+There is currently no mechanism to raise an exception from a delayed
+pcall. 
+
+Synchronization
+
+The following item is part of Argus, but the motivation isn't entirely
+clear.
+
+    For any two calls, C1 and C2, C1 always starts on the server
+    first.  For the promises, C2 is ready() iff C1 is also ready().
+    The promises can be claimed in any order.
+
+A related notion:
+
+    The connection should also support a synch() method that returns
+    only when all outstanding calls have completed.  If any of these
+    calls raised an exception, the synch() call raises an exception.
+
+XXX synch() sounds potentially useful, but it's not clear if it would
+be useful for ZEO.  In ZEO a single connection object handles multiple
+threads, each thread is going to make independent calls.  When a
+particular tpc_begin() returns and a thread commits its transaction,
+it makes more calls.  These calls will before any of the other
+tpc_begin() calls.
+
+I think the Argus approach would be to use separate handlers for each
+thread (not sure Argus had threads), so that a single thread could
+rely on ordering guarantees.
+
+Multithreaded server
+
+There are lots of issues to work out here.
+
+Delays may not be necessary if the connecftion handler runs in a
+different thread than the object the handles the calls.  
\ No newline at end of file


=== ZEO/ZEO/zrpc/__init__.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+# zrpc is a package with the following modules
+# error -- exceptions raised by zrpc
+# marshal -- internal, handles basic protocol issues
+# connection -- object dispatcher
+# client -- manages connection creation to remote server
+# server -- manages incoming connections from remote clients
+# trigger -- medusa's trigger


=== ZEO/ZEO/zrpc/client.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+import errno
+import select
+import socket
+import sys
+import threading
+import time
+import types
+
+import ThreadedAsync
+import zLOG
+
+from ZEO.zrpc.log import log
+from ZEO.zrpc.trigger import trigger
+from ZEO.zrpc.connection import ManagedConnection
+
+class ConnectionManager:
+    """Keeps a connection up over time"""
+
+    def __init__(self, addr, client, tmin=1, tmax=180):
+        self.set_addr(addr)
+        self.client = client
+        self.tmin = tmin
+        self.tmax = tmax
+        self.connected = 0
+        self.connection = None
+        self.closed = 0
+        # If _thread is not None, then there is a helper thread
+        # attempting to connect.  _thread is protected by _connect_lock.
+        self._thread = None
+        self._connect_lock = threading.Lock()
+        self.trigger = None
+        self.thr_async = 0
+        ThreadedAsync.register_loop_callback(self.set_async)
+
+    def __repr__(self):
+        return "<%s for %s>" % (self.__class__.__name__, self.addr)
+
+    def set_addr(self, addr):
+        "Set one or more addresses to use for server."
+
+        # For backwards compatibility (and simplicity?) the
+        # constructor accepts a single address in the addr argument --
+        # a string for a Unix domain socket or a 2-tuple with a
+        # hostname and port.  It can also accept a list of such addresses.
+
+        addr_type = self._guess_type(addr)
+        if addr_type is not None:
+            self.addr = [(addr_type, addr)]
+        else:
+            self.addr = []
+            for a in addr:
+                addr_type = self._guess_type(a)
+                if addr_type is None:
+                    raise ValueError, "unknown address in list: %s" % repr(a)
+                self.addr.append((addr_type, a))
+
+    def _guess_type(self, addr):
+        if isinstance(addr, types.StringType):
+            return socket.AF_UNIX
+
+        if (len(addr) == 2
+            and isinstance(addr[0], types.StringType)
+            and isinstance(addr[1], types.IntType)):
+            return socket.AF_INET
+
+        # not anything I know about
+        return None
+
+    def close(self):
+        """Prevent ConnectionManager from opening new connections"""
+        self.closed = 1
+        self._connect_lock.acquire()
+        try:
+            if self._thread is not None:
+                # XXX race on _thread
+                self._thread.stop()
+                self._thread.join()
+        finally:
+            self._connect_lock.release()
+        if self.connection:
+            self.connection.close()
+        if self.trigger is not None:
+            self.trigger.close()
+
+    def set_async(self, map):
+        # This is the callback registered with ThreadedAsync.  The
+        # callback might be called multiple times, so it shouldn't
+        # create a trigger every time and should never do anything
+        # after it's closed.
+
+        # It may be that the only case where it is called multiple
+        # times is in the test suite, where ThreadedAsync's loop can
+        # be started in a child process after a fork.  Regardless,
+        # it's good to be defensive.
+
+        # XXX need each connection started with async==0 to have a
+        # callback
+        if not self.closed and self.trigger is None:
+            self.trigger = trigger()
+            self.thr_async = 1 # XXX needs to be set on the Connection
+
+    def attempt_connect(self):
+        """Attempt a connection to the server without blocking too long.
+
+        There isn't a crisp definition for too long.  When a
+        ClientStorage is created, it attempts to connect to the
+        server.  If the server isn't immediately available, it can
+        operate from the cache.  This method will start the background
+        connection thread and wait a little while to see if it
+        finishes quickly.
+        """
+
+        # XXX will a single attempt take too long?
+        self.connect()
+        try:
+            event = self._thread.one_attempt
+        except AttributeError:
+            # An AttributeError means that (1) _thread is None and (2)
+            # as a consquence of (1) that the connect thread has
+            # already exited.
+            pass
+        else:
+            event.wait()
+        return self.connected
+
+    def connect(self, sync=0):
+        if self.connected == 1:
+            return
+        self._connect_lock.acquire()
+        try:
+            if self._thread is None:
+                log("starting thread to connect to server")
+                self._thread = ConnectThread(self, self.client, self.addr,
+                                             self.tmin, self.tmax)
+                self._thread.start()
+            if sync:
+                try:
+                    self._thread.join()
+                except AttributeError:
+                    # probably means the thread exited quickly
+                    pass
+        finally:
+            self._connect_lock.release()
+
+    def connect_done(self, c):
+        log("connect_done()")
+        self.connected = 1
+        self.connection = c
+        self._thread = None
+
+    def notify_closed(self, conn):
+        self.connected = 0
+        self.connection = None
+        self.client.notifyDisconnected()
+        if not self.closed:
+            self.connect()
+
+class Connected(Exception):
+    # helper for non-local exit
+    def __init__(self, sock):
+        self.sock = sock
+
+# When trying to do a connect on a non-blocking socket, some outcomes
+# are expected.  Set _CONNECT_IN_PROGRESS to the errno value(s) expected
+# when an initial connect can't complete immediately.  Set _CONNECT_OK
+# to the errno value(s) expected if the connect succeeds *or* if it's
+# already connected (our code can attempt redundant connects).
+if hasattr(errno, "WSAEWOULDBLOCK"):    # Windows
+    _CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
+    _CONNECT_OK          = (0, errno.WSAEISCONN)
+else:                                   # Unix
+    _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
+    _CONNECT_OK          = (0, errno.EISCONN)
+
+class ConnectThread(threading.Thread):
+    """Thread that tries to connect to server given one or more addresses.
+    The thread is passed a ConnectionManager and the manager's client
+    as arguments.  It calls notifyConnected() on the client when a
+    socket connects.  If notifyConnected() returns without raising an
+    exception, the thread is done; it calls connect_done() on the
+    manager and exits.
+
+    The thread will continue to run, attempting connections, until a
+    successful notifyConnected() or stop() is called.
+    """
+
+    __super_init = threading.Thread.__init__
+
+    # We don't expect clients to call any methods of this Thread other
+    # than close() and those defined by the Thread API.
+
+    def __init__(self, mgr, client, addrs, tmin, tmax):
+        self.__super_init(name="Connect(%s)" % addrs)
+        self.mgr = mgr
+        self.client = client
+        self.addrs = addrs
+        self.tmin = tmin
+        self.tmax = tmax
+        self.stopped = 0
+        self.one_attempt = threading.Event()
+        # A ConnectThread keeps track of whether it has finished a
+        # call to attempt_connects().  This allows the
+        # ConnectionManager to make an attempt to connect right away,
+        # but not block for too long if the server isn't immediately
+        # available.
+
+    def stop(self):
+        self.stopped = 1
+
+    # Every method from run() to the end is used internally by the Thread.
+
+    def run(self):
+        delay = self.tmin
+        while not self.stopped:
+            success = self.attempt_connects()
+            if not self.one_attempt.isSet():
+                self.one_attempt.set()
+            if success:
+                break
+            time.sleep(delay)
+            delay *= 2
+            if delay > self.tmax:
+                delay = self.tmax
+        log("thread exiting: %s" % self.getName())
+
+    def close_sockets(self):
+        for s in self.sockets.keys():
+            s.close()
+
+    def attempt_connects(self):
+        """Try connecting to all self.addrs addresses.
+
+        If at least one succeeds, pick a success arbitrarily, close all other
+        successes (if any), and return true.  If none succeed, return false.
+        """
+
+        self.sockets = {}  # {open socket:  connection address}
+
+        log("attempting connection on %d sockets" % len(self.addrs))
+        try:
+            for domain, addr in self.addrs:
+                if __debug__:
+                    log("attempt connection to %s" % repr(addr),
+                        level=zLOG.DEBUG)
+                try:
+                    s = socket.socket(domain, socket.SOCK_STREAM)
+                except socket.error, err:
+                    log("Failed to create socket with domain=%s: %s" % (
+                        domain, err), level=zLOG.ERROR)
+                    continue
+                s.setblocking(0)
+                self.sockets[s] = addr
+                # connect() raises Connected iff it succeeds
+                # XXX can still block for a while if addr requires DNS
+                self.connect(s)
+
+            # next wait until they actually connect
+            while self.sockets:
+                if self.stopped:
+                    self.close_sockets()
+                    return 0
+                try:
+                    sockets = self.sockets.keys()
+                    r, w, x = select.select([], sockets, sockets, 1.0)
+                except select.error:
+                    continue
+                for s in x:
+                    del self.sockets[s]
+                    s.close()
+                for s in w:
+                    # connect() raises Connected iff it succeeds
+                    self.connect(s)
+        except Connected, container:
+            s = container.sock
+            del self.sockets[s] # don't close the newly connected socket
+            self.close_sockets()
+            return 1
+        return 0
+
+    def connect(self, s):
+        """Call s.connect_ex(addr); raise Connected iff connection succeeds.
+
+        We have to handle several possible return values from
+        connect_ex().  If the socket is connected and the initial ZEO
+        setup works, we're done.  Report success by raising an
+        exception.  Yes, the is odd, but we need to bail out of the
+        select() loop in the caller and an exception is a principled
+        way to do the abort.
+
+        If the socket sonnects and the initial ZEO setup
+        (notifyConnected()) fails or the connect_ex() returns an
+        error, we close the socket, remove it from self.sockets, and
+        proceed with the other sockets.
+
+        If connect_ex() returns EINPROGRESS, we need to try again later.
+        """
+        addr = self.sockets[s]
+        try:
+            e = s.connect_ex(addr)
+        except socket.error, msg:
+            log("failed to connect to %s: %s" % (addr, msg),
+                level=zLOG.ERROR)
+        else:
+            log("connect_ex(%s) == %s" % (addr, e))
+            if e in _CONNECT_IN_PROGRESS:
+                return
+            elif e in _CONNECT_OK:
+                # special cases to deal with winsock oddities
+                if sys.platform.startswith("win") and e == 0:
+                    
+                    # It appears that winsock isn't behaving as
+                    # expected on Win2k.  It's possible for connect()
+                    # to return 0, but the connection to have failed.
+                    # In particular, in situations where I expect to
+                    # get a Connection refused (10061), I'm seeing
+                    # connect_ex() return 0.  OTOH, it looks like
+                    # select() is a more reliable indicator on
+                    # Windows.
+                    
+                    r, w, x = select.select([s], [s], [s], 0.1)
+                    if not (r or w or x):
+                        return
+                    if x:
+                        # see comment at the end of the function
+                        s.close()
+                        del self.socket[s]
+                c = self.test_connection(s, addr)
+                if c:
+                    log("connected to %s" % repr(addr), level=zLOG.DEBUG)
+                    raise Connected(s)
+            else:
+                log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
+                    level=zLOG.DEBUG)
+        # Any execution that doesn't raise Connected() or return
+        # because of CONNECT_IN_PROGRESS is an error.  Make sure the
+        # socket is closed and remove it from the dict of pending
+        # sockets.
+        s.close()
+        del self.sockets[s]
+
+    def test_connection(self, s, addr):
+        # Establish a connection at the zrpc level and call the
+        # client's notifyConnected(), giving the zrpc application a
+        # chance to do app-level check of whether the connection is
+        # okay.
+        c = ManagedConnection(s, addr, self.client, self.mgr)
+        try:
+            self.client.notifyConnected(c)
+        except:
+            log("error connecting to server: %s" % str(addr),
+                level=zLOG.ERROR, error=sys.exc_info())
+            c.close()
+            # Closing the ZRPC connection will eventually close the
+            # socket, somewhere in asyncore.
+            return 0
+        self.mgr.connect_done(c)
+        return 1


=== ZEO/ZEO/zrpc/connection.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+import asyncore
+import sys
+import threading
+import types
+
+import ThreadedAsync
+from ZEO import smac # XXX put smac in zrpc?
+from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
+from ZEO.zrpc.log import log, short_repr
+from ZEO.zrpc.marshal import Marshaller
+from ZEO.zrpc.trigger import trigger
+import zLOG
+from ZODB import POSException
+
+REPLY = ".reply" # message name used for replies
+ASYNC = 1
+
+class Delay:
+    """Used to delay response to client for synchronous calls
+
+    When a synchronous call is made and the original handler returns
+    without handling the call, it returns a Delay object that prevents
+    the mainloop from sending a response.
+    """
+
+    def set_sender(self, msgid, send_reply):
+        self.msgid = msgid
+        self.send_reply = send_reply
+
+    def reply(self, obj):
+        self.send_reply(self.msgid, obj)
+
+class Connection(smac.SizedMessageAsyncConnection):
+    """Dispatcher for RPC on object on both sides of socket.
+
+    The connection supports synchronous calls, which expect a return,
+    and asynchronous calls that do not.
+
+    It uses the Marshaller class to handle encoding and decoding of
+    method calls are arguments.
+
+    A Connection is designed for use in a multithreaded application,
+    where a synchronous call must block until a response is ready.
+    The current design only allows a single synchronous call to be
+    outstanding.
+
+    A socket connection between a client and a server allows either
+    side to invoke methods on the other side.  The processes on each
+    end of the socket use a Connection object to manage communication.
+    """
+
+    __super_init = smac.SizedMessageAsyncConnection.__init__
+    __super_close = smac.SizedMessageAsyncConnection.close
+    __super_writable = smac.SizedMessageAsyncConnection.writable
+    __super_message_output = smac.SizedMessageAsyncConnection.message_output
+
+    protocol_version = "Z200"
+
+    def __init__(self, sock, addr, obj=None):
+        self.obj = None
+        self.marshal = Marshaller()
+        self.closed = 0
+        self.msgid = 0
+        self.__super_init(sock, addr)
+        # A Connection either uses asyncore directly or relies on an
+        # asyncore mainloop running in a separate thread.  If
+        # thr_async is true, then the mainloop is running in a
+        # separate thread.  If thr_async is true, then the asyncore
+        # trigger (self.trigger) is used to notify that thread of
+        # activity on the current thread.
+        self.thr_async = 0
+        self.trigger = None
+        self._prepare_async()
+        self._map = {self._fileno: self}
+        self.__call_lock = threading.Lock()
+        # The reply lock is used to block when a synchronous call is
+        # waiting for a response
+        self.__reply_lock = threading.Lock()
+        self.__reply_lock.acquire()
+        self.register_object(obj)
+        self.handshake()
+
+    def __repr__(self):
+        return "<%s %s>" % (self.__class__.__name__, self.addr)
+
+    def close(self):
+        if self.closed:
+            return
+        self.closed = 1
+        self.close_trigger()
+        self.__super_close()
+
+    def close_trigger(self):
+        # overridden by ManagedConnection
+        if self.trigger is not None:
+            self.trigger.close()
+
+    def register_object(self, obj):
+        """Register obj as the true object to invoke methods on"""
+        self.obj = obj
+
+    def handshake(self):
+        # When a connection is created the first message sent is a
+        # 4-byte protocol version.  This mechanism should allow the
+        # protocol to evolve over time, and let servers handle clients
+        # using multiple versions of the protocol.
+
+        # The mechanism replace the message_input() method for the
+        # first message received.
+
+        # The client sends the protocol version it is using.
+        self._message_input = self.message_input
+        self.message_input = self.recv_handshake
+        self.message_output(self.protocol_version)
+
+    def recv_handshake(self, message):
+        if message == self.protocol_version:
+            self.message_input = self._message_input
+        # otherwise do something else...
+    
+    def message_input(self, message):
+        """Decoding an incoming message and dispatch it"""
+        # XXX Not sure what to do with errors that reach this level.
+        # Need to catch ZRPCErrors in handle_reply() and
+        # handle_request() so that they get back to the client.
+        try:
+            msgid, flags, name, args = self.marshal.decode(message)
+        except DecodingError, msg:
+            return self.return_error(None, None, DecodingError, msg)
+
+        if __debug__:
+            log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
+                                              short_repr(args)),
+                level=zLOG.DEBUG)
+        if name == REPLY:
+            self.handle_reply(msgid, flags, args)
+        else:
+            self.handle_request(msgid, flags, name, args)
+
+    def handle_reply(self, msgid, flags, args):
+        if __debug__:
+            log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
+                level=zLOG.DEBUG)
+        self.__reply = msgid, flags, args
+        self.__reply_lock.release() # will fail if lock is unlocked
+
+    def handle_request(self, msgid, flags, name, args):
+        if not self.check_method(name):
+            msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
+            raise ZRPCError(msg)
+        if __debug__:
+            log("%s%s" % (name, args), level=zLOG.BLATHER)
+
+        meth = getattr(self.obj, name)
+        try:
+            ret = meth(*args)
+        except Exception, msg:
+            error = sys.exc_info()[:2]
+            log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
+            return self.return_error(msgid, flags, error[0], error[1])
+
+        if flags & ASYNC:
+            if ret is not None:
+                raise ZRPCError("async method %s returned value %s" %
+                                (name, repr(ret)))
+        else:
+            if __debug__:
+                log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
+            if isinstance(ret, Delay):
+                ret.set_sender(msgid, self.send_reply)
+            else:
+                self.send_reply(msgid, ret)
+
+    def handle_error(self):
+        self.log_error()
+        self.close()
+
+    def log_error(self, msg="No error message supplied"):
+        log(msg, zLOG.ERROR, error=sys.exc_info())
+
+    def check_method(self, name):
+        # XXX Is this sufficient "security" for now?
+        if name.startswith('_'):
+            return None
+        return hasattr(self.obj, name)
+
+    def send_reply(self, msgid, ret):
+        msg = self.marshal.encode(msgid, 0, REPLY, ret)
+        self.message_output(msg)
+
+    def return_error(self, msgid, flags, err_type, err_value):
+        if flags is None:
+            self.log_error("Exception raised during decoding")
+            return
+        if flags & ASYNC:
+            self.log_error("Asynchronous call raised exception: %s" % self)
+            return
+        if type(err_value) is not types.InstanceType:
+            err_value = err_type, err_value
+
+        try:
+            msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
+        except self.marshal.errors:
+            err = ZRPCError("Couldn't pickle error %s" % `err_value`)
+            msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
+        self.message_output(msg)
+        self._do_async_poll()
+
+    # The next two public methods (call and callAsync) are used by
+    # clients to invoke methods on remote objects
+
+    def call(self, method, *args):
+        self.__call_lock.acquire()
+        try:
+            return self._call(method, args)
+        finally:
+            self.__call_lock.release()
+
+    def _call(self, method, args):
+        if self.closed:
+            raise DisconnectedError("This action is temporarily unavailable")
+        msgid = self.msgid
+        self.msgid = self.msgid + 1
+        if __debug__:
+            log("send msg: %d, 0, %s, ..." % (msgid, method))
+        self.message_output(self.marshal.encode(msgid, 0, method, args))
+
+        # XXX implementation of promises starts here
+
+        self.__reply = None
+        # reply lock is currently held
+        self._do_async_loop()
+        # reply lock is held again...
+        r_msgid, r_flags, r_args = self.__reply
+        self.__reply_lock.acquire()
+        assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
+
+        if type(r_args) == types.TupleType \
+           and type(r_args[0]) == types.ClassType \
+           and issubclass(r_args[0], Exception):
+            raise r_args[1] # error raised by server
+        return r_args
+
+    def callAsync(self, method, *args):
+        self.__call_lock.acquire()
+        try:
+            self._callAsync(method, args)
+        finally:
+            self.__call_lock.release()
+
+    def _callAsync(self, method, args):
+        if self.closed:
+            raise DisconnectedError("This action is temporarily unavailable")
+        msgid = self.msgid
+        self.msgid += 1
+        if __debug__:
+            log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
+        self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
+        # XXX The message won't go out right away in this case.  It
+        # will wait for the asyncore loop to get control again.  Seems
+        # okay to comment our for now, but need to understand better.
+        self._do_async_poll()
+
+    # handle IO, possibly in async mode
+
+    def _prepare_async(self):
+        self.thr_async = 0
+        ThreadedAsync.register_loop_callback(self.set_async)
+        # XXX If we are not in async mode, this will cause dead
+        # Connections to be leaked.
+
+    def set_async(self, map):
+        self.trigger = trigger()
+        self.thr_async = 1
+
+    def is_async(self):
+        if self.thr_async:
+            return 1
+        else:
+            return 0
+
+    def _do_async_loop(self):
+        "Invoke asyncore mainloop and wait for reply."
+        if __debug__:
+            log("_do_async_loop() async=%d" % self.is_async(),
+                level=zLOG.DEBUG)
+        if self.is_async():
+            self.trigger.pull_trigger()
+            self.__reply_lock.acquire()
+            # wait until reply...
+        else:
+            # Do loop only if lock is already acquired.  XXX But can't
+            # we already guarantee that the lock is already acquired?
+            while not self.__reply_lock.acquire(0):
+                asyncore.poll(10.0, self._map)
+                if self.closed:
+                    raise DisconnectedError()
+        self.__reply_lock.release()
+
+    def _do_async_poll(self, wait_for_reply=0):
+        "Invoke asyncore mainloop to get pending message out."
+
+        if __debug__:
+            log("_do_async_poll(), async=%d" % self.is_async(),
+                level=zLOG.DEBUG)
+        if self.is_async():
+            self.trigger.pull_trigger()
+        else:
+            asyncore.poll(0.0, self._map)
+
+class ServerConnection(Connection):
+    """Connection on the server side"""
+
+    # The server side does not send a protocol message.  Instead, it
+    # adapts to whatever the client sends it.
+
+class ManagedServerConnection(ServerConnection):
+    """A connection that notifies its ConnectionManager of closing"""
+    __super_init = Connection.__init__
+    __super_close = Connection.close
+
+    def __init__(self, sock, addr, obj, mgr):
+        self.__mgr = mgr
+        self.__super_init(sock, addr, obj)
+        obj.notifyConnected(self)
+
+    def close(self):
+        self.__super_close()
+        self.__mgr.close(self)
+
+class ManagedConnection(Connection):
+    """A connection that notifies its ConnectionManager of closing.
+
+    A managed connection also defers the ThreadedAsync work to its
+    manager.
+    """
+    __super_init = Connection.__init__
+    __super_close = Connection.close
+
+    def __init__(self, sock, addr, obj, mgr):
+        self.__mgr = mgr
+        self.__super_init(sock, addr, obj)
+        self.check_mgr_async()
+
+    def close_trigger(self):
+        # the manager should actually close the trigger
+        del self.trigger
+
+    def set_async(self, map):
+        pass
+
+    def _prepare_async(self):
+        # Don't do the register_loop_callback that the superclass does
+        pass
+
+    def check_mgr_async(self):
+        if not self.thr_async and self.__mgr.thr_async:
+            assert self.__mgr.trigger is not None, \
+                   "manager (%s) has no trigger" % self.__mgr
+            self.thr_async = 1
+            self.trigger = self.__mgr.trigger
+            return 1
+        return 0
+
+    def is_async(self):
+        if self.thr_async:
+            return 1
+        return self.check_mgr_async()
+
+    def close(self):
+        self.__super_close()
+        self.__mgr.notify_closed(self)


=== ZEO/ZEO/zrpc/error.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+from ZODB import POSException
+from ZEO.Exceptions import Disconnected
+
+class ZRPCError(POSException.StorageError):
+    pass
+
+class DecodingError(ZRPCError):
+    """A ZRPC message could not be decoded."""
+
+class DisconnectedError(ZRPCError, Disconnected):
+    """The database storage is disconnected from the storage server."""


=== ZEO/ZEO/zrpc/log.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+import os
+import types
+import zLOG
+
+_label = "zrpc:%s" % os.getpid()
+
+def new_label():
+    global _label
+    _label = "zrpc:%s" % os.getpid()
+
+def log(message, level=zLOG.BLATHER, label=None, error=None):
+    zLOG.LOG(label or _label, level, message, error=error)
+
+REPR_LIMIT = 40
+
+def short_repr(obj):
+    "Return an object repr limited to REPR_LIMIT bytes."
+    # Some of the objects being repr'd are large strings.  It's wastes
+    # a lot of memory to repr them and then truncate, so special case
+    # them in this function.
+    # Also handle short repr of a tuple containing a long string.
+    if isinstance(obj, types.StringType):
+        obj = obj[:REPR_LIMIT]
+    elif isinstance(obj, types.TupleType):
+        elts = []
+        size = 0
+        for elt in obj:
+            r = repr(elt)
+            elts.append(r)
+            size += len(r)
+            if size > REPR_LIMIT:
+                break
+        obj = tuple(elts)
+    return repr(obj)[:REPR_LIMIT]


=== ZEO/ZEO/zrpc/marshal.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+import cPickle
+from cStringIO import StringIO
+import struct
+import types
+
+from ZEO.zrpc.error import ZRPCError
+
+class Marshaller:
+    """Marshal requests and replies to second across network"""
+
+    # It's okay to share a single Pickler as long as it's in fast
+    # mode, which means that it doesn't have a memo.
+
+    pickler = cPickle.Pickler()
+    pickler.fast = 1
+    pickle = pickler.dump
+
+    errors = (cPickle.UnpickleableError,
+              cPickle.UnpicklingError,
+              cPickle.PickleError,
+              cPickle.PicklingError)
+
+    VERSION = 1
+
+    def encode(self, msgid, flags, name, args):
+        """Returns an encoded message"""
+        return self.pickle((msgid, flags, name, args), 1)
+
+    def decode(self, msg):
+        """Decodes msg and returns its parts"""
+        unpickler = cPickle.Unpickler(StringIO(msg))
+        unpickler.find_global = find_global
+
+        try:
+            return unpickler.load() # msgid, flags, name, args
+        except (self.errors, IndexError), err_msg:
+            log("can't decode %s" % repr(msg), level=zLOG.ERROR)
+            raise DecodingError(msg)
+        
+_globals = globals()
+_silly = ('__doc__',)
+
+def find_global(module, name):
+    """Helper for message unpickler"""
+    try:
+        m = __import__(module, _globals, _globals, _silly)
+    except ImportError, msg:
+        raise ZRPCError("import error %s: %s" % (module, msg))
+
+    try:
+        r = getattr(m, name)
+    except AttributeError:
+        raise ZRPCError("module %s has no global %s" % (module, name))
+
+    safe = getattr(r, '__no_side_effects__', 0)
+    if safe:
+        return r
+
+    # XXX what's a better way to do this?  esp w/ 2.1 & 2.2
+    if type(r) == types.ClassType and issubclass(r, Exception):
+        return r
+
+    raise ZRPCError("Unsafe global: %s.%s" % (module, name))


=== ZEO/ZEO/zrpc/server.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+import asyncore
+import socket
+import types
+
+from ZEO.zrpc.connection import Connection, Delay
+from ZEO.zrpc.log import log
+
+# Export the main asyncore loop
+loop = asyncore.loop
+
+class Dispatcher(asyncore.dispatcher):
+    """A server that accepts incoming RPC connections"""
+    __super_init = asyncore.dispatcher.__init__
+
+    reuse_addr = 1
+
+    def __init__(self, addr, factory=Connection, reuse_addr=None):
+        self.__super_init()
+        self.addr = addr
+        self.factory = factory
+        self.clients = []
+        if reuse_addr is not None:
+            self.reuse_addr = reuse_addr
+        self._open_socket()
+
+    def _open_socket(self):
+        if type(self.addr) == types.TupleType:
+            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        else:
+            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        log("listening on %s" % str(self.addr))
+        self.bind(self.addr)
+        self.listen(5)
+
+    def writable(self):
+        return 0
+
+    def readable(self):
+        return 1
+
+    def handle_accept(self):
+        try:
+            sock, addr = self.accept()
+        except socket.error, msg:
+            log("accepted failed: %s" % msg)
+            return
+        c = self.factory(sock, addr)
+        log("connect from %s: %s" % (repr(addr), c))
+        self.clients.append(c)


=== ZEO/ZEO/zrpc/trigger.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+# This module is a simplified version of the select_trigger module
+# from Sam Rushing's Medusa server.
+
+import asyncore
+
+import os
+import socket
+import thread
+
+if os.name == 'posix':
+
+    class trigger (asyncore.file_dispatcher):
+
+        "Wake up a call to select() running in the main thread"
+
+        # This is useful in a context where you are using Medusa's I/O
+        # subsystem to deliver data, but the data is generated by another
+        # thread.  Normally, if Medusa is in the middle of a call to
+        # select(), new output data generated by another thread will have
+        # to sit until the call to select() either times out or returns.
+        # If the trigger is 'pulled' by another thread, it should immediately
+        # generate a READ event on the trigger object, which will force the
+        # select() invocation to return.
+
+        # A common use for this facility: letting Medusa manage I/O for a
+        # large number of connections; but routing each request through a
+        # thread chosen from a fixed-size thread pool.  When a thread is
+        # acquired, a transaction is performed, but output data is
+        # accumulated into buffers that will be emptied more efficiently
+        # by Medusa. [picture a server that can process database queries
+        # rapidly, but doesn't want to tie up threads waiting to send data
+        # to low-bandwidth connections]
+
+        # The other major feature provided by this class is the ability to
+        # move work back into the main thread: if you call pull_trigger()
+        # with a thunk argument, when select() wakes up and receives the
+        # event it will call your thunk from within that thread.  The main
+        # purpose of this is to remove the need to wrap thread locks around
+        # Medusa's data structures, which normally do not need them.  [To see
+        # why this is true, imagine this scenario: A thread tries to push some
+        # new data onto a channel's outgoing data queue at the same time that
+        # the main thread is trying to remove some]
+
+        def __init__ (self):
+            r, w = os.pipe()
+            self.trigger = w
+            asyncore.file_dispatcher.__init__ (self, r)
+            self.lock = thread.allocate_lock()
+            self.thunks = []
+
+        def close(self):
+            self.del_channel()
+            self.socket.close() # the read side of the pipe
+            os.close(self.trigger) # the write side of the pipe
+
+        def __repr__ (self):
+            return '<select-trigger (pipe) at %x>' % id(self)
+
+        def readable (self):
+            return 1
+
+        def writable (self):
+            return 0
+
+        def handle_connect (self):
+            pass
+
+        def pull_trigger (self, thunk=None):
+            # print 'PULL_TRIGGER: ', len(self.thunks)
+            if thunk:
+                try:
+                    self.lock.acquire()
+                    self.thunks.append (thunk)
+                finally:
+                    self.lock.release()
+            os.write (self.trigger, 'x')
+
+        def handle_read (self):
+            self.recv (8192)
+            try:
+                self.lock.acquire()
+                for thunk in self.thunks:
+                    try:
+                        thunk()
+                    except:
+                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+                self.thunks = []
+            finally:
+                self.lock.release()
+
+else:
+
+    # win32-safe version
+
+    class trigger (asyncore.dispatcher):
+
+        address = ('127.9.9.9', 19999)
+
+        def __init__ (self):
+            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+
+            # set TCP_NODELAY to true to avoid buffering
+            w.setsockopt(socket.IPPROTO_TCP, 1, 1)
+
+            # tricky: get a pair of connected sockets
+            host='127.0.0.1'
+            port=19999
+            while 1:
+                try:
+                    self.address=(host, port)
+                    a.bind(self.address)
+                    break
+                except:
+                    if port <= 19950:
+                        raise 'Bind Error', 'Cannot bind trigger!'
+                    port=port - 1
+
+            a.listen (1)
+            w.setblocking (0)
+            try:
+                w.connect (self.address)
+            except:
+                pass
+            r, addr = a.accept()
+            a.close()
+            w.setblocking (1)
+            self.trigger = w
+
+            asyncore.dispatcher.__init__ (self, r)
+            self.lock = thread.allocate_lock()
+            self.thunks = []
+            self._trigger_connected = 0
+
+        def __repr__ (self):
+            return '<select-trigger (loopback) at %x>' % id(self)
+
+        def readable (self):
+            return 1
+
+        def writable (self):
+            return 0
+
+        def handle_connect (self):
+            pass
+
+        def pull_trigger (self, thunk=None):
+            if thunk:
+                try:
+                    self.lock.acquire()
+                    self.thunks.append (thunk)
+                finally:
+                    self.lock.release()
+            self.trigger.send ('x')
+
+        def handle_read (self):
+            self.recv (8192)
+            try:
+                self.lock.acquire()
+                for thunk in self.thunks:
+                    try:
+                        thunk()
+                    except:
+                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
+                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+                self.thunks = []
+            finally:
+                self.lock.release()