[Zodb-checkins] CVS: ZODB3/ZEO/zrpc - client.py:1.16 connection.py:1.32

Guido van Rossum guido@python.org
Fri, 20 Sep 2002 13:37:35 -0400


Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv26301/ZEO/zrpc

Modified Files:
	client.py connection.py 
Log Message:
I set out making wait=1 work for fallback connections, i.e. the
ClientStorage constructor called with both wait=1 and
read_only_fallback=1 should return, indicating its readiness, when a
read-only connection was made.  This is done by calling
connect(sync=1).  Previously this waited for the ConnectThread to
finish, but that thread doesn't finish until it's made a read-write
connection, so a different mechanism is needed.

I ended up doing a major overhaul of the interfaces between
ClientStorage, ConnectionManager, ConnectThread/ConnectWrapper, and
even ManagedConnection.  Changes:

ClientStorage.py:

  ClientStorage:

  - testConnection() now returns just the preferred flag; stubs are
    cheap and I like to have the notifyConnected() signature be the
    same for clients and servers.

  - notifyConnected() now takes a connection (to match the signature
    of this method in StorageServer), and creates a new stub.  It also
    takes care of the reconnect business if the client was already
    connected, rather than the ClientManager.  It stores the
    connection as self._connection so it can close the previous one.
    This is also reset by notifyDisconnected().

zrpc/client.py:

  ConnectionManager:

  - Changed self.thread_lock into a condition variable.  It now also
    protects self.connection.  The condition is notified when
    self.connection is set to a non-None value in connect_done();
    connect(sync=1) waits for it.  The self.connected variable is no
    more; we test "self.connection is not None" instead.

  - Tried to made close() reentrant.  (There's a trick: you can't set
    self.connection to None, conn.close() ends up calling close_conn()
    which does this.)

  - Renamed notify_closed() to close_conn(), for symmetry with the
    StorageServer API.

  - Added an is_connected() method so ConnectThread.try_connect()
    doesn't have to dig inside the manager's guts to find out if the
    manager is connected (important for the disposition of fallback
    wrappers).

  ConnectThread and ConnectWrapper:

  - Follow above changes in the ClientStorage and ConnectionManager
    APIs: don't close the manager's connection when reconnecting, but
    leave that up to notifyConnected(); ConnectWrapper no longer
    manages the stub.

  - ConnectWrapper sets self.sock to None once it's created a
    ManagedConnection -- from there on the connection is is charge of
    closing the socket.

zrpc/connection.py:

  ManagedServerConnection:
  
  - Changed the order in which close() calls things; super_close()
    should be last.

  ManagedConnection:

  - Ditto, and call the manager's close_conn() instead of
    notify_closed().

tests/testZEO.py:

  - In checkReconnectSwitch(), we can now open the client storage with
    wait=1 and read_only_fallback=1.



=== ZODB3/ZEO/zrpc/client.py 1.15 => 1.16 ===
--- ZODB3/ZEO/zrpc/client.py:1.15	Wed Sep 18 23:51:23 2002
+++ ZODB3/ZEO/zrpc/client.py	Fri Sep 20 13:37:34 2002
@@ -36,13 +36,12 @@
         self.client = client
         self.tmin = tmin
         self.tmax = tmax
-        self.connected = 0
-        self.connection = None
+        self.cond = threading.Condition(threading.Lock())
+        self.connection = None # Protected by self.cond
         self.closed = 0
         # If thread is not None, then there is a helper thread
-        # attempting to connect.  thread is protected by thread_lock.
-        self.thread = None
-        self.thread_lock = threading.Lock()
+        # attempting to connect.
+        self.thread = None # Protected by self.cond
         self.trigger = None
         self.thr_async = 0
         ThreadedAsync.register_loop_callback(self.set_async)
@@ -85,21 +84,26 @@
     def close(self):
         """Prevent ConnectionManager from opening new connections"""
         self.closed = 1
-        self.thread_lock.acquire()
+        self.cond.acquire()
         try:
             t = self.thread
+            self.thread = None
+            conn = self.connection
         finally:
-            self.thread_lock.release()
+            self.cond.release()
         if t is not None:
             log("CM.close(): stopping and joining thread")
             t.stop()
             t.join(30)
             if t.isAlive():
-                log("CM.close(): self.thread.join() timed out")
-        if self.connection:
-            self.connection.close()
+                log("CM.close(): self.thread.join() timed out",
+                    level=zLOG.WARNING)
+        if conn is not None:
+            # This will call close_conn() below which clears self.connection
+            conn.close()
         if self.trigger is not None:
             self.trigger.close()
+            self.trigger = None
 
     def set_async(self, map):
         # This is the callback registered with ThreadedAsync.  The
@@ -131,23 +135,36 @@
         finishes quickly.
         """
 
-        # XXX will a single attempt take too long?
+        # XXX Will a single attempt take too long?
+        # XXX Answer: it depends -- normally, you'll connect or get a
+        # connection refused error very quickly.  Packet-eating
+        # firewalls and other mishaps may cause the connect to take a
+        # long time to time out though.  It's also possible that you
+        # connect quickly to a slow server, and the attempt includes
+        # at least one roundtrip to the server (the register() call).
+        # But that's as fast as you can expect it to be.
         self.connect()
-        self.thread_lock.acquire()
+        self.cond.acquire()
         try:
             t = self.thread
+            conn = self.connection
         finally:
-            self.thread_lock.release()
-        if t is not None:
+            self.cond.release()
+        if t is not None and conn is None:
             event = t.one_attempt
             event.wait()
-        return self.connected
+            self.cond.acquire()
+            try:
+                conn = self.connection
+            finally:
+                self.cond.release()
+        return conn is not None
 
     def connect(self, sync=0):
-        if self.connected == 1:
-            return
-        self.thread_lock.acquire()
+        self.cond.acquire()
         try:
+            if self.connection is not None:
+                return
             t = self.thread
             if t is None:
                 log("CM.connect(): starting ConnectThread")
@@ -155,37 +172,51 @@
                                                 self.addrlist,
                                                 self.tmin, self.tmax)
                 t.start()
+            if sync:
+                while self.connection is None:
+                    self.cond.wait(30)
+                    if self.connection is None:
+                        log("CM.connect(sync=1): still waiting...")
         finally:
-            self.thread_lock.release()
+            self.cond.release()
         if sync:
-            t.join(30)
-            while t.isAlive():
-                log("CM.connect(sync=1): thread join timed out")
-                t.join(30)
+            assert self.connection is not None
 
     def connect_done(self, conn, preferred):
+        # Called by ConnectWrapper.notify_client() after notifying the client
         log("CM.connect_done(preferred=%s)" % preferred)
-        self.connected = 1
-        self.connection = conn
-        if preferred:
-            self.thread_lock.acquire()
-            try:
+        self.cond.acquire()
+        try:
+            self.connection = conn
+            if preferred:
                 self.thread = None
-            finally:
-                self.thread_lock.release()
+            self.cond.notifyAll() # Wake up connect(sync=1)
+        finally:
+            self.cond.release()
 
-    def notify_closed(self, conn):
-        if conn is not self.connection:
-            # Closing a non-current connection
-            log("CM.notify_closed() non-current", level=zLOG.BLATHER)
-            return
-        log("CM.notify_closed()")
-        self.connected = 0
-        self.connection = None
+    def close_conn(self, conn):
+        # Called by the connection when it is closed
+        self.cond.acquire()
+        try:
+            if conn is not self.connection:
+                # Closing a non-current connection
+                log("CM.close_conn() non-current", level=zLOG.BLATHER)
+                return
+            log("CM.close_conn()")
+            self.connection = None
+        finally:
+            self.cond.release()
         self.client.notifyDisconnected()
         if not self.closed:
             self.connect()
 
+    def is_connected(self):
+        self.cond.acquire()
+        try:
+            return self.connection is not None
+        finally:
+            self.cond.release()
+
 # When trying to do a connect on a non-blocking socket, some outcomes
 # are expected.  Set _CONNECT_IN_PROGRESS to the errno value(s) expected
 # when an initial connect can't complete immediately.  Set _CONNECT_OK
@@ -207,20 +238,20 @@
 
     The thread is passed a ConnectionManager and the manager's client
     as arguments.  It calls testConnection() on the client when a
-    socket connects; that should return a tuple (stub, score) where
-    stub is an RPC stub, and score is 1 or 0 depending on whether this
+    socket connects; that should return 1 or 0 indicating whether this
     is a preferred or a fallback connection.  It may also raise an
     exception, in which case the connection is abandoned.
 
     The thread will continue to run, attempting connections, until a
-    preferred stub is seen or until all sockets have been tried.
+    preferred connection is seen or until all sockets have been tried.
 
-    As soon as testConnection() returns a preferred stub, or after all
-    sockets have been tried and at least one fallback stub has been
-    seen, notifyConnected(stub) is called on the client and
-    connect_done() on the manager.  If this was a preferred stub, the
-    thread then exits; otherwise, it keeps trying until it gets a
-    preferred stub, and then reconnects the client using that stub.
+    As soon as testConnection() finds a preferred connection, or after
+    all sockets have been tried and at least one fallback connection
+    has been seen, notifyConnected(connection) is called on the client
+    and connect_done() on the manager.  If this was a preferred
+    connection, the thread then exits; otherwise, it keeps trying
+    until it gets a preferred connection, and then reconnects the
+    client using that connection.
 
     """
 
@@ -248,6 +279,7 @@
 
     def run(self):
         delay = self.tmin
+        success = 0
         while not self.stopped:
             success = self.try_connecting()
             if not self.one_attempt.isSet():
@@ -315,10 +347,10 @@
                     del wrappers[wrap]
 
         # If we've got wrappers left at this point, they're fallback
-        # connections.  Try notifying then until one succeeds.
+        # connections.  Try notifying them until one succeeds.
         for wrap in wrappers.keys():
             assert wrap.state == "tested" and wrap.preferred == 0
-            if self.mgr.connected:
+            if self.mgr.is_connected():
                 wrap.close()
             else:
                 wrap.notify_client()
@@ -356,7 +388,6 @@
         self.state = "closed"
         self.sock = None
         self.conn = None
-        self.stub = None
         self.preferred = 0
         log("CW: attempt to connect to %s" % repr(addr))
         try:
@@ -402,8 +433,9 @@
         """
         self.conn = ManagedConnection(self.sock, self.addr,
                                       self.client, self.mgr)
+        self.sock = None # The socket is now owned by the connection
         try:
-            (self.stub, self.preferred) = self.client.testConnection(self.conn)
+            self.preferred = self.client.testConnection(self.conn)
             self.state = "tested"
         except ReadOnlyError:
             log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr))
@@ -422,16 +454,12 @@
 
         If this succeeds, call the manager's connect_done().
 
-        If the client is already connected, we assume it's a fallbac
-        connection, the new stub must be a preferred stub, and we
-        first disconnect the client.
+        If the client is already connected, we assume it's a fallback
+        connection, and the new connection must be a preferred
+        connection.  The client will close the old connection.
         """
-        if self.mgr.connected:
-            assert self.preferred
-            log("CW: reconnecting client to preferred stub")
-            self.mgr.connection.close()
         try:
-            self.client.notifyConnected(self.stub)
+            self.client.notifyConnected(self.conn)
         except:
             log("CW: error in notifyConnected (%s)" % repr(self.addr),
                 level=zLOG.ERROR, error=sys.exc_info())
@@ -443,7 +471,7 @@
     def close(self):
         """Close the socket and reset everything."""
         self.state = "closed"
-        self.stub = self.mgr = self.client = None
+        self.mgr = self.client = None
         self.preferred = 0
         if self.conn is not None:
             # Closing the ZRPC connection will eventually close the


=== ZODB3/ZEO/zrpc/connection.py 1.31 => 1.32 ===
--- ZODB3/ZEO/zrpc/connection.py:1.31	Wed Sep 18 23:51:23 2002
+++ ZODB3/ZEO/zrpc/connection.py	Fri Sep 20 13:37:34 2002
@@ -427,8 +427,8 @@
 
     def close(self):
         self.obj.notifyDisconnected()
-        self.__super_close()
         self.__mgr.close_conn(self)
+        self.__super_close()
 
 class ManagedConnection(Connection):
     """Client-side Connection subclass."""
@@ -469,5 +469,5 @@
         return self.check_mgr_async()
 
     def close(self):
+        self.__mgr.close_conn(self)
         self.__super_close()
-        self.__mgr.notify_closed(self)