[Zodb-checkins] CVS: StandaloneZODB/ZEO - zrpc2.py:1.1.2.24

Jeremy Hylton jeremy@zope.com
Tue, 8 Jan 2002 23:40:16 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv3178

Modified Files:
      Tag: ZEO-ZRPC-Dev
	zrpc2.py 
Log Message:
Change ConnectionManager to support multiple server addresses.

This is a significant reworking of the connect() logic in
ConnectionManager.  It works on an arbitrary list of server
addresses.  It opens a single socket for each address and does a
non-blocking connect.  It stops when it gets the first successful
connection.

The __m_connect() method is the entry point for the connection thread.

The self.addr attribute has changed its use.  It doesn't sort a single
addresss, it stores a list of domain X address 2-tuples.  The
constructor accepts a single address or a list and parses each in
advance to determine if it's a Unix domain socket or an Internet
domain socket.


Two other changes:

  - The ConnectionManager keeps a reference to its connection and
    closes the connection when the manager's close() is executed.

  - The notifyDisconnected() callback no longer passes an unused
    None. 



=== StandaloneZODB/ZEO/zrpc2.py 1.1.2.23 => 1.1.2.24 ===
 
 import asyncore
+import errno
 import cPickle
 import os
+import select
 import socket
 import sys
 import threading
@@ -385,12 +387,13 @@
     # notifyDisconnected.   make this optional?
 
     def __init__(self, addr, obj=None, debug=1, tmin=1, tmax=180):
-        self.addr = addr
+        self.set_addr(addr)
         self.obj = obj
         self.tmin = tmin
         self.tmax = tmax
         self.debug = debug
         self.connected = 0
+        self.connection = None
         # If _thread is not None, then there is a helper thread
         # attempting to connect.  _thread is protected by _connect_lock.
         self._thread = None
@@ -403,6 +406,38 @@
     def __repr__(self):
         return "<%s for %s>" % (self.__class__.__name__, self.addr)
 
+    def set_addr(self, addr):
+        "Set one or more addresses to use for server."
+
+        # For backwards compatibility (and simplicity?) the
+        # constructor accepts a single address in the addr argument --
+        # a string for a Unix domain socket or a 2-tuple with a
+        # hostname and port.  It can also accept a list of such addresses.
+
+        addr_type = self._guess_type(addr)
+        if addr_type is not None:
+            self.addr = [(addr_type, addr)]
+        else:
+            self.addr = []
+            for a in addr:
+                addr_type = self._guess_type(a)
+                if addr_type is None:
+                    raise ValueError, "unknown address in list: %s" % repr(a)
+                self.addr.append((addr_type, a))
+
+    def _guess_type(self, addr):
+        if isinstance(addr, types.StringType):
+            return socket.AF_UNIX
+
+        if (len(addr) == 2
+            and isinstance(addr[0], types.StringType)
+            and isinstance(addr[1], types.IntType)):
+            return socket.AF_INET
+
+        # not anything I know about
+
+        return None
+
     def close(self):
         """Prevent ConnectionManager from opening new connections"""
         self.closed = 1
@@ -412,6 +447,8 @@
                 self._thread.join()
         finally:
             self._connect_lock.release()
+        if self.connection:
+            self.connection.close()
 
     def register_object(self, obj):
         self.obj = obj
@@ -427,8 +464,9 @@
         self._connect_lock.acquire()
         try:
             if self._thread is None:
-                self._thread = threading.Thread(target=self.__connect,
-                                                args=(1,))
+                zLOG.LOG(_label, zLOG.BLATHER,
+                         "starting thread to connect to server")
+                self._thread = threading.Thread(target=self.__m_connect)
                 self._thread.start()
             if sync:
                 try:
@@ -440,72 +478,115 @@
             self._connect_lock.release()
 
     def attempt_connect(self):
-        self.__connect(repeat=0)
+        # XXX will _attempt_connects() take too long?  think select().
+        self._attempt_connects()
         return self.connected
 
-    def __connect(self, repeat=1):
-        """Attempt to connect to StorageServer.
-
-        This method should always be called by attempt_connect() or by
-        connect().
-        """
+    def notify_closed(self, conn):
+        self.connected = 0
+        self.connection = None
+        self.obj.notifyDisconnected()
+        if not self.closed:
+            self.connect()
 
+    class Connected(Exception):
+        def __init__(self, sock):
+            self.sock = sock
+            
+    def __m_connect(self):
+        # a new __connect that handles multiple addresses
         try:
-            tries = 0
-            t = self.tmin
-            while not (self.connected or self.closed) \
-                  and (repeat or (tries == 0)):
-                tries = tries + 1
-                log("Trying to connect to server")
-                s = self._connect_socket()
-                if s is None:
-                    if repeat:
-                        t = self._wait(t)
-                else:
-                    if self.debug:
-                        log("Connected to server", level=zLOG.DEBUG)
-                    self.connected = 1
-            if self.connected and not self.closed:
-                c = ManagedConnection(s, self.addr, self.obj, self)
-                log("Connection created: %s" % c)
-                try:
-                    self.obj.notifyConnected(c)
-                except:
-                    self._thread = None
-                    c.close()
-                    # When the connection is closed, we'll trigger
-                    # another attempt to reconnect.
+            delay = self.tmin
+            while not (self.closed or self._attempt_connects()):
+                time.sleep(delay)
+                delay *= 2
+                if delay > self.tmax:
+                    delay = self.tmax
         finally:
-            # must always clear _thread on the way out
             self._thread = None
 
-    def _connect_socket(self):
+    def _attempt_connects(self):
+        "Return true if any connect attempt succeeds."
+        sockets = {}
+
+        zLOG.LOG(_label, zLOG.BLATHER,
+                 "attempting connection on %d sockets" % len(self.addr))
         try:
-            if type(self.addr) is types.StringType:
-                s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-            else:
-                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            s.connect(self.addr)
-        except socket.error, msg:
-            if self.debug:
-                log("Failed to connect to server: %s" % msg,
-                    level=zLOG.DEBUG)
-            s.close()
-            return None
-        return s
+            for domain, addr in self.addr:
+                if __debug__:
+                    zLOG.LOG(_label, zLOG.DEBUG,
+                             "attempt connection to %s" % repr(addr))
+                s = socket.socket(domain, socket.SOCK_STREAM)
+                s.setblocking(0)
+                # XXX can still block for a while if addr requires DNS
+                e = self._connect_ex(s, addr)
+                if e is not None:
+                    sockets[s] = addr
+
+            # next wait until the actually connect
+            while sockets:
+                if self.closed:
+                    for s in sockets.keys():
+                        s.close()
+                    return 0
+                try:
+                    r, w, x = select.select([], sockets.keys(), [], 1.0)
+                except select.error:
+                    continue
+                for s in w:
+                    e = self._connect_ex(s, sockets[s])
+                    if e is None:
+                        del sockets[s]
+        except self.Connected, container:
+            s = container.sock
+            del sockets[s]
+            # close all the other sockets
+            for s in sockets.keys():
+                s.close()
+            return 1
+        return 0
+
+    def _connect_ex(self, s, addr):
+        """Call s.connect_ex(addr) and return true if loop should continue.
+
+        We have to handle several possible return values from
+        connect_ex().  If the socket is connected and the initial ZEO
+        setup works, we're done.  Report success by raising an
+        exception.  Yes, the is odd, but we need to bail out of the
+        select() loop in the caller and an exception is a principled
+        way to do the abort.
 
-    def _wait(self, t):
-        time.sleep(t)
-        t = t * 2
-        if t > self.tmax:
-            t = self.tmax
-        return t
+        If the socket sonnects and the initial ZEO setup fails or the
+        connect_ex() returns an error, we close the socket and ignore it.
 
-    def notify_closed(self, conn):
-        self.connected = 0
-        self.obj.notifyDisconnected(None)
-        if not self.closed:
-            self.connect()
+        If connect_ex() returns EINPROGRESS, we need to try again later.
+        """
+        
+        e = s.connect_ex(addr)
+        if e == errno.EINPROGRESS:
+            return 1
+        elif e == 0:
+            c = self._test_connection(s, addr)
+            if c:
+                self.connected = 1
+                raise self.Connected(s)
+        else:
+            if __debug__:
+                zLOG.LOG(_label, zLOG.DEBUG,
+                         "error connecting to %s: %s" % (addr,
+                                                         errno.errorcode[e]))
+            s.close()
+
+    def _test_connection(self, s, addr):
+        c = ManagedConnection(s, addr, self.obj, self)
+        try:
+            self.obj.notifyConnected(c)
+            self.connection = c
+            return 1
+        except:
+            # log something here?
+            c.close()
+        return 0
 
 class ManagedServerConnection(ServerConnection):
     """A connection that notifies its ConnectionManager of closing"""