[Zodb-checkins] CVS: ZODB4/ZEO/zrpc - smac.py:1.1 __init__.py:1.3 client.py:1.6 connection.py:1.7 error.py:1.3 log.py:1.3 marshal.py:1.3 server.py:1.5 trigger.py:1.3 NOTES:NONE

Jeremy Hylton jeremy@zope.com
Fri, 22 Nov 2002 16:24:54 -0500


Update of /cvs-repository/ZODB4/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv7160/ZEO/zrpc

Modified Files:
	__init__.py client.py connection.py error.py log.py marshal.py 
	server.py trigger.py 
Added Files:
	smac.py 
Removed Files:
	NOTES 
Log Message:
Merge ZEO2 into ZODB4.


=== Added File ZODB4/ZEO/zrpc/smac.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Sized Message Async Connections."""

import asyncore, struct
import threading
from ZEO.Exceptions import Disconnected
import zLOG
from types import StringType

from ZEO.zrpc.log import log, short_repr

import socket, errno

# Use the dictionary to make sure we get the minimum number of errno
# entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
# or that only one is actually used.

tmp_dict = {errno.EWOULDBLOCK: 0,
            errno.EAGAIN: 0,
            errno.EINTR: 0,
            }
expected_socket_read_errors = tuple(tmp_dict.keys())

tmp_dict = {errno.EAGAIN: 0,
            errno.EWOULDBLOCK: 0,
            errno.ENOBUFS: 0,
            errno.EINTR: 0,
            }
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict

# We chose 60000 as the socket limit by looking at the largest strings
# that we could pass to send() without blocking.
SEND_SIZE = 60000

class SizedMessageAsyncConnection(asyncore.dispatcher):
    __super_init = asyncore.dispatcher.__init__
    __super_close = asyncore.dispatcher.close

    __closed = 1 # Marker indicating that we're closed

    socket = None # to outwit Sam's getattr

    def __init__(self, sock, addr, map=None, debug=None):
        self.addr = addr
        if debug is not None:
            self._debug = debug
        elif not hasattr(self, '_debug'):
            self._debug = __debug__
        # __input_lock protects __inp, __input_len, __state, __msg_size
        self.__input_lock = threading.Lock()
        self.__inp = None # None, a single String, or a list
        self.__input_len = 0
        # Instance variables __state and __msg_size work together:
        #   when __state == 0:
        #     __msg_size == 4, and the next thing read is a message size;
        #   when __state == 1:
        #     __msg_size is variable, and the next thing read is a message.
        # The next thing read is always of length __msg_size.
        # The state alternates between 0 and 1.
        self.__state = 0
        self.__msg_size = 4
        self.__output_lock = threading.Lock() # Protects __output
        self.__output = []
        self.__closed = 0
        self.__super_init(sock, map)

    def get_addr(self):
        return self.addr

    # XXX avoid expensive getattr calls?  Can't remember exactly what
    # this comment was supposed to mean, but it has something to do
    # with the way asyncore uses getattr and uses if sock:
    def __nonzero__(self):
        return 1

    def handle_read(self):
        self.__input_lock.acquire()
        try:
            # Use a single __inp buffer and integer indexes to make this fast.
            try:
                d = self.recv(8192)
            except socket.error, err:
                if err[0] in expected_socket_read_errors:
                    return
                raise
            if not d:
                return

            input_len = self.__input_len + len(d)
            msg_size = self.__msg_size
            state = self.__state

            inp = self.__inp
            if msg_size > input_len:
                if inp is None:
                    self.__inp = d
                elif type(self.__inp) is StringType:
                    self.__inp = [self.__inp, d]
                else:
                    self.__inp.append(d)
                self.__input_len = input_len
                return # keep waiting for more input

            # load all previous input and d into single string inp
            if isinstance(inp, StringType):
                inp = inp + d
            elif inp is None:
                inp = d
            else:
                inp.append(d)
                inp = "".join(inp)

            offset = 0
            while (offset + msg_size) <= input_len:
                msg = inp[offset:offset + msg_size]
                offset = offset + msg_size
                if not state:
                    # waiting for message
                    msg_size = struct.unpack(">i", msg)[0]
                    state = 1
                else:
                    msg_size = 4
                    state = 0
                    # XXX We call message_input() with __input_lock
                    # held!!!  And message_input() may end up calling
                    # message_output(), which has its own lock.  But
                    # message_output() cannot call message_input(), so
                    # the locking order is always consistent, which
                    # prevents deadlock.  Also, message_input() may
                    # take a long time, because it can cause an
                    # incoming call to be handled.  During all this
                    # time, the __input_lock is held.  That's a good
                    # thing, because it serializes incoming calls.
                    self.message_input(msg)

            self.__state = state
            self.__msg_size = msg_size
            self.__inp = inp[offset:]
            self.__input_len = input_len - offset
        finally:
            self.__input_lock.release()

    def readable(self):
        return 1

    def writable(self):
        if len(self.__output) == 0:
            return 0
        else:
            return 1

    def handle_write(self):
        self.__output_lock.acquire()
        try:
            output = self.__output
            while output:
                # Accumulate output into a single string so that we avoid
                # multiple send() calls, but avoid accumulating too much
                # data.  If we send a very small string and have more data
                # to send, we will likely incur delays caused by the
                # unfortunate interaction between the Nagle algorithm and
                # delayed acks.  If we send a very large string, only a
                # portion of it will actually be delivered at a time.

                l = 0
                for i in range(len(output)):
                    l += len(output[i])
                    if l > SEND_SIZE:
                        break

                i += 1
                # It is very unlikely that i will be 1.
                v = "".join(output[:i])
                del output[:i]

                try:
                    n = self.send(v)
                except socket.error, err:
                    if err[0] in expected_socket_write_errors:
                        break # we couldn't write anything
                    raise
                if n < len(v):
                    output.insert(0, v[n:])
                    break # we can't write any more
        finally:
            self.__output_lock.release()

    def handle_close(self):
        self.close()

    def message_output(self, message):
        if __debug__:
            if self._debug:
                log('message_output %d bytes: %s' %
                    (len(message), short_repr(message)),
                    level=zLOG.TRACE)

        if self.__closed:
            raise Disconnected, (
                "This action is temporarily unavailable."
                "<p>"
                )
        self.__output_lock.acquire()
        try:
            # do two separate appends to avoid copying the message string
            self.__output.append(struct.pack(">i", len(message)))
            if len(message) <= SEND_SIZE:
                self.__output.append(message)
            else:
                for i in range(0, len(message), SEND_SIZE):
                    self.__output.append(message[i:i+SEND_SIZE])
        finally:
            self.__output_lock.release()

    def close(self):
        if not self.__closed:
            self.__closed = 1
            self.__super_close()


=== ZODB4/ZEO/zrpc/__init__.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/__init__.py:1.2	Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/__init__.py	Fri Nov 22 16:24:53 2002
@@ -2,19 +2,23 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 # zrpc is a package with the following modules
+# client -- manages connection creation to remote server
+# connection -- object dispatcher
+# log -- logging helper
 # error -- exceptions raised by zrpc
 # marshal -- internal, handles basic protocol issues
-# connection -- object dispatcher
-# client -- manages connection creation to remote server
 # server -- manages incoming connections from remote clients
+# smac -- sized message async connections
 # trigger -- medusa's trigger
+
+# zrpc is not an advertised subpackage of ZEO; its interfaces are internal


=== ZODB4/ZEO/zrpc/client.py 1.5 => 1.6 === (568/668 lines abridged)
--- ZODB4/ZEO/zrpc/client.py:1.5	Tue Aug  6 19:09:20 2002
+++ ZODB4/ZEO/zrpc/client.py	Fri Nov 22 16:24:53 2002
@@ -2,14 +2,14 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 import errno
 import select
@@ -22,6 +22,8 @@
 import ThreadedAsync
 import zLOG
 
+from ZODB.POSException import ReadOnlyError
+
 from ZEO.zrpc.log import log
 from ZEO.zrpc.trigger import trigger
 from ZEO.zrpc.connection import ManagedConnection
@@ -29,43 +31,44 @@
 class ConnectionManager:
     """Keeps a connection up over time"""
 
-    def __init__(self, addr, client, tmin=1, tmax=180):
-        self.set_addr(addr)
+    def __init__(self, addrs, client, tmin=1, tmax=180):
+        self.addrlist = self._parse_addrs(addrs)
         self.client = client
         self.tmin = tmin
         self.tmax = tmax
-        self.connected = 0
-        self.connection = None
+        self.cond = threading.Condition(threading.Lock())
+        self.connection = None # Protected by self.cond
         self.closed = 0
-        # If _thread is not None, then there is a helper thread
-        # attempting to connect.  _thread is protected by _connect_lock.
-        self._thread = None
-        self._connect_lock = threading.Lock()
+        # If thread is not None, then there is a helper thread
+        # attempting to connect.

[-=- -=- -=- 568 lines omitted -=- -=- -=-]

+            return
         except:
-            log("error connecting to server: %s" % str(addr),
+            log("CW: error in testConnection (%s)" % repr(self.addr),
                 level=zLOG.ERROR, error=sys.exc_info())
-            c.close()
+            self.close()
+            return
+        if self.preferred:
+            self.notify_client()
+
+    def notify_client(self):
+        """Call the client's notifyConnected().
+
+        If this succeeds, call the manager's connect_done().
+
+        If the client is already connected, we assume it's a fallback
+        connection, and the new connection must be a preferred
+        connection.  The client will close the old connection.
+        """
+        try:
+            self.client.notifyConnected(self.conn)
+        except:
+            log("CW: error in notifyConnected (%s)" % repr(self.addr),
+                level=zLOG.ERROR, error=sys.exc_info())
+            self.close()
+            return
+        self.state = "notified"
+        self.mgr.connect_done(self.conn, self.preferred)
+
+    def close(self):
+        """Close the socket and reset everything."""
+        self.state = "closed"
+        self.mgr = self.client = None
+        self.preferred = 0
+        if self.conn is not None:
             # Closing the ZRPC connection will eventually close the
             # socket, somewhere in asyncore.
-            return 0
-        self.mgr.connect_done(c)
-        return 1
+            # XXX Why do we care? --Guido
+            self.conn.close()
+            self.conn = None
+        if self.sock is not None:
+            self.sock.close()
+            self.sock = None
+
+    def fileno(self):
+        return self.sock.fileno()


=== ZODB4/ZEO/zrpc/connection.py 1.6 => 1.7 === (459/559 lines abridged)
--- ZODB4/ZEO/zrpc/connection.py:1.6	Tue Aug  6 19:10:31 2002
+++ ZODB4/ZEO/zrpc/connection.py	Fri Nov 22 16:24:53 2002
@@ -2,23 +2,25 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 import asyncore
+import errno
+import select
 import sys
 import threading
 import types
 
 import ThreadedAsync
-from ZEO import smac # XXX put smac in zrpc?
-from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
+from ZEO.zrpc import smac
+from ZEO.zrpc.error import ZRPCError, DisconnectedError
 from ZEO.zrpc.log import log, short_repr
 from ZEO.zrpc.marshal import Marshaller
 from ZEO.zrpc.trigger import trigger
@@ -36,49 +38,82 @@
     the mainloop from sending a response.
     """
 
-    def set_sender(self, msgid, send_reply):
+    def set_sender(self, msgid, send_reply, return_error):
         self.msgid = msgid
         self.send_reply = send_reply
+        self.return_error = return_error
 
     def reply(self, obj):
         self.send_reply(self.msgid, obj)
 
+    def error(self, exc_info):
+        log("Error raised in delayed method", zLOG.ERROR, error=exc_info)
+        self.return_error(self.msgid, 0, *exc_info[:2])
+
 class MTDelay(Delay):

[-=- -=- -=- 459 lines omitted -=- -=- -=-]

         self.__super_close()
-        self.__mgr.close(self)
 
 class ManagedConnection(Connection):
-    """A connection that notifies its ConnectionManager of closing.
-
-    A managed connection also defers the ThreadedAsync work to its
-    manager.
-    """
+    """Client-side Connection subclass."""
     __super_init = Connection.__init__
     __super_close = Connection.close
 
     def __init__(self, sock, addr, obj, mgr):
-        self.__mgr = mgr
+        self.mgr = mgr
         self.__super_init(sock, addr, obj)
         self.check_mgr_async()
 
+    # Defer the ThreadedAsync work to the manager.
+
     def close_trigger(self):
         # the manager should actually close the trigger
         del self.trigger
@@ -387,19 +479,20 @@
         pass
 
     def check_mgr_async(self):
-        if not self.thr_async and self.__mgr.thr_async:
-            assert self.__mgr.trigger is not None, \
-                   "manager (%s) has no trigger" % self.__mgr
+        if not self.thr_async and self.mgr.thr_async:
+            assert self.mgr.trigger is not None, \
+                   "manager (%s) has no trigger" % self.mgr
             self.thr_async = 1
-            self.trigger = self.__mgr.trigger
+            self.trigger = self.mgr.trigger
             return 1
         return 0
 
     def is_async(self):
+        # XXX could the check_mgr_async() be avoided on each test?
         if self.thr_async:
             return 1
         return self.check_mgr_async()
 
     def close(self):
+        self.mgr.close_conn(self)
         self.__super_close()
-        self.__mgr.notify_closed(self)


=== ZODB4/ZEO/zrpc/error.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/error.py:1.2	Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/error.py	Fri Nov 22 16:24:53 2002
@@ -2,23 +2,20 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 from ZODB import POSException
 from ZEO.Exceptions import Disconnected
 
 class ZRPCError(POSException.StorageError):
     pass
-
-class DecodingError(ZRPCError):
-    """A ZRPC message could not be decoded."""
 
 class DisconnectedError(ZRPCError, Disconnected):
     """The database storage is disconnected from the storage server."""


=== ZODB4/ZEO/zrpc/log.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/log.py:1.2	Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/log.py	Fri Nov 22 16:24:53 2002
@@ -2,18 +2,21 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 import os
 import types
 import zLOG
+import threading
+
+LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
 
 _label = "zrpc:%s" % os.getpid()
 
@@ -22,18 +25,34 @@
     _label = "zrpc:%s" % os.getpid()
 
 def log(message, level=zLOG.BLATHER, label=None, error=None):
-    zLOG.LOG(label or _label, level, message, error=error)
+    label = label or _label
+    if LOG_THREAD_ID:
+        label = "%s:%s" % (label, threading.currentThread().getName())
+    zLOG.LOG(label, level, message, error=error)
 
 REPR_LIMIT = 40
 
 def short_repr(obj):
     "Return an object repr limited to REPR_LIMIT bytes."
+
     # Some of the objects being repr'd are large strings.  It's wastes
     # a lot of memory to repr them and then truncate, so special case
     # them in this function.
     # Also handle short repr of a tuple containing a long string.
+
+    # This strategy works well for arguments to StorageServer methods.
+    # The oid is usually first and will get included in its entirety.
+    # The pickle is near the beginning, too, and you can often fit the
+    # module name in the pickle.
+
     if isinstance(obj, types.StringType):
-        obj = obj[:REPR_LIMIT]
+        if len(obj) > REPR_LIMIT:
+            r = repr(obj[:REPR_LIMIT])
+        else:
+            r = repr(obj)
+        if len(r) > REPR_LIMIT:
+            r = r[:REPR_LIMIT-4] + '...' + r[-1]
+        return r
     elif isinstance(obj, types.TupleType):
         elts = []
         size = 0
@@ -43,5 +62,10 @@
             size += len(r)
             if size > REPR_LIMIT:
                 break
-        obj = tuple(elts)
-    return repr(obj)[:REPR_LIMIT]
+        r = "(%s)" % (", ".join(elts))
+    else:
+        r = repr(obj)
+    if len(r) > REPR_LIMIT:
+        return r[:REPR_LIMIT] + '...'
+    else:
+        return r


=== ZODB4/ZEO/zrpc/marshal.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/marshal.py:1.2	Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/marshal.py	Fri Nov 22 16:24:53 2002
@@ -2,42 +2,33 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 import cPickle
 from cStringIO import StringIO
-import struct
 import types
 
+import zLOG
+
 from ZEO.zrpc.error import ZRPCError
+from ZEO.zrpc.log import log, short_repr
 
 class Marshaller:
     """Marshal requests and replies to second across network"""
 
-    # It's okay to share a single Pickler as long as it's in fast
-    # mode, which means that it doesn't have a memo.
-
-    pickler = cPickle.Pickler()
-    pickler.fast = 1
-    pickle = pickler.dump
-
-    errors = (cPickle.UnpickleableError,
-              cPickle.UnpicklingError,
-              cPickle.PickleError,
-              cPickle.PicklingError)
-
-    VERSION = 1
-
     def encode(self, msgid, flags, name, args):
         """Returns an encoded message"""
-        return self.pickle((msgid, flags, name, args), 1)
+        # (We used to have a global pickler, but that's not thread-safe. :-( )
+        pickler = cPickle.Pickler()
+        pickler.fast = 1
+        return pickler.dump((msgid, flags, name, args), 1)
 
     def decode(self, msg):
         """Decodes msg and returns its parts"""
@@ -46,10 +37,10 @@
 
         try:
             return unpickler.load() # msgid, flags, name, args
-        except (self.errors, IndexError), err_msg:
-            log("can't decode %s" % repr(msg), level=zLOG.ERROR)
-            raise DecodingError(msg)
-        
+        except:
+            log("can't decode message: %s" % short_repr(msg), level=zLOG.ERROR)
+            raise
+
 _globals = globals()
 _silly = ('__doc__',)
 


=== ZODB4/ZEO/zrpc/server.py 1.4 => 1.5 ===
--- ZODB4/ZEO/zrpc/server.py:1.4	Thu Jul 25 12:47:55 2002
+++ ZODB4/ZEO/zrpc/server.py	Fri Nov 22 16:24:53 2002
@@ -2,14 +2,14 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
 import asyncore
 import socket
@@ -17,6 +17,7 @@
 
 from ZEO.zrpc.connection import Connection, Delay
 from ZEO.zrpc.log import log
+import zLOG
 
 # Export the main asyncore loop
 loop = asyncore.loop
@@ -42,7 +43,7 @@
         else:
             self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
         self.set_reuse_addr()
-        log("listening on %s" % str(self.addr))
+        log("listening on %s" % str(self.addr), zLOG.INFO)
         self.bind(self.addr)
         self.listen(5)
 


=== ZODB4/ZEO/zrpc/trigger.py 1.2 => 1.3 ===
--- ZODB4/ZEO/zrpc/trigger.py:1.2	Tue Jun 11 15:22:26 2002
+++ ZODB4/ZEO/zrpc/trigger.py	Fri Nov 22 16:24:53 2002
@@ -2,27 +2,25 @@
 #
 # Copyright (c) 2001, 2002 Zope Corporation and Contributors.
 # All Rights Reserved.
-# 
+#
 # This software is subject to the provisions of the Zope Public License,
 # Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
 # FOR A PARTICULAR PURPOSE
-# 
+#
 ##############################################################################
-# This module is a simplified version of the select_trigger module
-# from Sam Rushing's Medusa server.
 
 import asyncore
-
 import os
 import socket
 import thread
+import errno
 
 if os.name == 'posix':
 
-    class trigger (asyncore.file_dispatcher):
+    class trigger(asyncore.file_dispatcher):
 
         "Wake up a call to select() running in the main thread"
 
@@ -54,129 +52,157 @@
         # new data onto a channel's outgoing data queue at the same time that
         # the main thread is trying to remove some]
 
-        def __init__ (self):
-            r, w = os.pipe()
+        def __init__(self):
+            r, w = self._fds = os.pipe()
             self.trigger = w
-            asyncore.file_dispatcher.__init__ (self, r)
+            asyncore.file_dispatcher.__init__(self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
+            self._closed = 0
+
+        # Override the asyncore close() method, because it seems that
+        # it would only close the r file descriptor and not w.  The
+        # constructor calls file_dispatcher.__init__ and passes r,
+        # which would get stored in a file_wrapper and get closed by
+        # the default close.  But that would leave w open...
 
         def close(self):
-            self.del_channel()
-            self.socket.close() # the read side of the pipe
-            os.close(self.trigger) # the write side of the pipe
+            if not self._closed:
+                self._closed = 1
+                self.del_channel()
+                for fd in self._fds:
+                    os.close(fd)
+                self._fds = []
 
-        def __repr__ (self):
+        def __repr__(self):
             return '<select-trigger (pipe) at %x>' % id(self)
 
-        def readable (self):
+        def readable(self):
             return 1
 
-        def writable (self):
+        def writable(self):
             return 0
 
-        def handle_connect (self):
+        def handle_connect(self):
             pass
 
-        def pull_trigger (self, thunk=None):
-            # print 'PULL_TRIGGER: ', len(self.thunks)
+        def handle_close(self):
+            self.close()
+
+        def pull_trigger(self, thunk=None):
             if thunk:
+                self.lock.acquire()
                 try:
-                    self.lock.acquire()
-                    self.thunks.append (thunk)
+                    self.thunks.append(thunk)
                 finally:
                     self.lock.release()
-            os.write (self.trigger, 'x')
+            os.write(self.trigger, 'x')
 
-        def handle_read (self):
-            self.recv (8192)
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except socket.error:
+                return
+            self.lock.acquire()
             try:
-                self.lock.acquire()
                 for thunk in self.thunks:
                     try:
                         thunk()
                     except:
-                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
-                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+                        nil, t, v, tbinfo = asyncore.compact_traceback()
+                        print ('exception in trigger thunk:'
+                               ' (%s:%s %s)' % (t, v, tbinfo))
                 self.thunks = []
             finally:
                 self.lock.release()
 
 else:
 
+    # XXX Should define a base class that has the common methods and
+    # then put the platform-specific in a subclass named trigger.
+
     # win32-safe version
 
-    class trigger (asyncore.dispatcher):
+    HOST = '127.0.0.1'
+    MINPORT = 19950
+    NPORTS = 50
+
+    class trigger(asyncore.dispatcher):
 
-        address = ('127.9.9.9', 19999)
+        portoffset = 0
 
-        def __init__ (self):
-            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
-            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+        def __init__(self):
+            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
             # set TCP_NODELAY to true to avoid buffering
             w.setsockopt(socket.IPPROTO_TCP, 1, 1)
 
             # tricky: get a pair of connected sockets
-            host='127.0.0.1'
-            port=19999
-            while 1:
+            for i in range(NPORTS):
+                trigger.portoffset = (trigger.portoffset + 1) % NPORTS
+                port = MINPORT + trigger.portoffset
+                address = (HOST, port)
                 try:
-                    self.address=(host, port)
-                    a.bind(self.address)
+                    a.bind(address)
+                except socket.error:
+                    continue
+                else:
                     break
-                except:
-                    if port <= 19950:
-                        raise 'Bind Error', 'Cannot bind trigger!'
-                    port=port - 1
+            else:
+                raise RuntimeError, 'Cannot bind trigger!'
 
-            a.listen (1)
-            w.setblocking (0)
+            a.listen(1)
+            w.setblocking(0)
             try:
-                w.connect (self.address)
+                w.connect(address)
             except:
                 pass
             r, addr = a.accept()
             a.close()
-            w.setblocking (1)
+            w.setblocking(1)
             self.trigger = w
 
-            asyncore.dispatcher.__init__ (self, r)
+            asyncore.dispatcher.__init__(self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
             self._trigger_connected = 0
 
-        def __repr__ (self):
+        def __repr__(self):
             return '<select-trigger (loopback) at %x>' % id(self)
 
-        def readable (self):
+        def readable(self):
             return 1
 
-        def writable (self):
+        def writable(self):
             return 0
 
-        def handle_connect (self):
+        def handle_connect(self):
             pass
 
-        def pull_trigger (self, thunk=None):
+        def pull_trigger(self, thunk=None):
             if thunk:
+                self.lock.acquire()
                 try:
-                    self.lock.acquire()
-                    self.thunks.append (thunk)
+                    self.thunks.append(thunk)
                 finally:
                     self.lock.release()
-            self.trigger.send ('x')
+            self.trigger.send('x')
 
-        def handle_read (self):
-            self.recv (8192)
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except socket.error:
+                return
+            self.lock.acquire()
             try:
-                self.lock.acquire()
                 for thunk in self.thunks:
                     try:
                         thunk()
                     except:
-                        (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
-                        print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
+                        nil, t, v, tbinfo = asyncore.compact_traceback()
+                        print ('exception in trigger thunk:'
+                               ' (%s:%s %s)' % (t, v, tbinfo))
                 self.thunks = []
             finally:
                 self.lock.release()

=== Removed File ZODB4/ZEO/zrpc/NOTES ===