[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.83 ServerStub.py:1.12

Jeremy Hylton jeremy@zope.com
Tue, 14 Jan 2003 14:09:05 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv17853

Modified Files:
	ClientStorage.py ServerStub.py 
Log Message:
Prevent client from using stale cache data while connecting.

XXX Maybe there should be an option to allow this.

A ZEO client can run in disconnected mode, using data from
its cache, or in connected mode.  Several instance variables
are related to whether the client is connected.

_server: All method calls are invoked through the server
   stub.  When not connect, set to disconnected_stub an
   object that raises ClientDisconnected errors.

_ready: A threading Event that is set only if _server
   is set to a real stub.

_connection: The current zrpc connection or None.

_connection is set as soon as a connection is established,
but _server is set only after cache verification has finished
and clients can safely use the server.  _pending_server holds
a server stub while it is being verified.

Before this change, a client could start using a connection before
verification finished.  If verification took a long time, it could
even commit a new transaction using a mixing of old and new data.


=== ZODB3/ZEO/ClientStorage.py 1.82 => 1.83 ===
--- ZODB3/ZEO/ClientStorage.py:1.82	Tue Jan  7 17:34:06 2003
+++ ZODB3/ZEO/ClientStorage.py	Tue Jan 14 14:08:33 2003
@@ -202,11 +202,33 @@
             wait = 1
 
         self._addr = addr # For tests
+
+        # A ZEO client can run in disconnected mode, using data from
+        # its cache, or in connected mode.  Several instance variables
+        # are related to whether the client is connected.
+
+        # _server: All method calls are invoked through the server
+        #    stub.  When not connect, set to disconnected_stub an
+        #    object that raises ClientDisconnected errors.
+
+        # _ready: A threading Event that is set only if _server
+        #    is set to a real stub.
+
+        # _connection: The current zrpc connection or None.
+
+        # _connection is set as soon as a connection is established,
+        # but _server is set only after cache verification has finished
+        # and clients can safely use the server.  _pending_server holds
+        # a server stub while it is being verified.
+        
         self._server = disconnected_stub
+        self._connection = None
+        self._pending_server = None
+        self._ready = threading.Event()
+        
         self._is_read_only = read_only
         self._storage = storage
         self._read_only_fallback = read_only_fallback
-        self._connection = None
         # _server_addr is used by sortKey()
         self._server_addr = None
         self._tfile = None
@@ -257,15 +279,45 @@
                                                     tmax=max_disconnect_poll)
 
         if wait:
-            self._rpc_mgr.connect(sync=1)
+            self._wait()
         else:
+            # attempt_connect() will make an attempt that doesn't block
+            # "too long," for a very vague notion of too long.  If that
+            # doesn't succeed, call connect() to start a thread.
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
+            # If the connect hasn't occurred, run with cached data.
+            if not self._ready.isSet():
+                self._cache.open()
 
-        # If we're connected at this point, the cache is opened as a
-        # side effect of verify_cache().  If not, open it now.
-        if not self.is_connected():
-            self._cache.open()
+    def _wait(self):
+        # Wait for a connection to be established.
+        self._rpc_mgr.connect(sync=1)
+        # When a synchronous connect() call returns, there is
+        # a valid _connection object but cache validation may
+        # still be going on.  This code must wait until validation
+        # finishes, but if the connection isn't a zrpc async
+        # connection it also needs to poll for input.
+        if self._connection.is_async():
+            while 1:
+                self._ready.wait(30)
+                if self._ready.isSet():
+                    break
+                log2(INFO, "Waiting to connect to server")
+        else:
+            # If there is no mainloop running, this code needs
+            # to call poll() to cause asyncore to handle events.
+            while 1:
+                cn = self._connection
+                if cn is None:
+                    # If the connection was closed while we were
+                    # waiting for it to become ready, start over.
+                    return self._wait()
+                else:
+                    cn.pending(30)
+                if self._ready.isSet():
+                    break
+                log2(INFO, "Waiting to connect to server")
 
     def close(self):
         """Storage API: finalize the storage, releasing external resources."""
@@ -288,17 +340,22 @@
 
     def is_connected(self):
         """Return whether the storage is currently connected to a server."""
-        if self._server is disconnected_stub:
-            return 0
-        else:
-            return 1
+        # This function is used by clients, so we only report that a
+        # connection exists when the connection is ready to use.
+        return self._ready.isSet()
 
     def sync(self):
         """Handle any pending invalidation messages.
 
         This is called by the sync method in ZODB.Connection.
         """
-        self._server._update()
+        # If there is no connection, return immediately.  Technically,
+        # there are no pending invalidations so they are all handled.
+        # There doesn't seem to be much benefit to raising an exception.
+        
+        cn = self._connection
+        if cn is not None:
+            cn.pending()
 
     def testConnection(self, conn):
         """Internal: test the given connection.
@@ -346,23 +403,24 @@
         else:
             reconnect = 0
         self.set_server_addr(conn.get_addr())
-        stub = self.StorageServerStubClass(conn)
-        stub = self.StorageServerStubClass(conn)
-        self._oids = []
-        self._info.update(stub.get_info())
-        self.verify_cache(stub)
 
-        # XXX The stub should be saved here and set in endVerify() below.
+        # If we are upgrading from a read-only fallback connection,
+        # we must close the old connection to prevent it from being
+        # used while the cache is verified against the new connection.
         if self._connection is not None:
             self._connection.close()
         self._connection = conn
-        self._server = stub
 
         if reconnect:
             log2(INFO, "Reconnected to storage: %s" % self._server_addr)
         else:
             log2(INFO, "Connected to storage: %s" % self._server_addr)
 
+        stub = self.StorageServerStubClass(conn)
+        self._oids = []
+        self._info.update(stub.get_info())
+        self.verify_cache(stub)
+
     def set_server_addr(self, addr):
         # Normalize server address and convert to string
         if isinstance(addr, types.StringType):
@@ -396,6 +454,11 @@
         The return value (indicating which path we took) is used by
         the test suite.
         """
+
+        # If verify_cache() finishes the cache verification process,
+        # it should set self._server.  If it goes through full cache
+        # verification, then endVerify() should self._server.
+        
         last_inval_tid = self._cache.getLastTid()
         if last_inval_tid is not None:
             ltid = server.lastTransaction()
@@ -403,10 +466,12 @@
                 log2(INFO, "No verification necessary "
                      "(last_inval_tid up-to-date)")
                 self._cache.open()
+                self._server = server
+                self._ready.set()
                 return "no verification"
 
             # log some hints about last transaction
-            log2(INFO, "last inval tid: %r %s"
+            log2(INFO, "last inval tid: %r %s\n"
                  % (last_inval_tid, tid2time(last_inval_tid)))
             log2(INFO, "last transaction: %r %s" %
                  (ltid, ltid and tid2time(ltid)))
@@ -416,6 +481,8 @@
                 log2(INFO, "Recovering %d invalidations" % len(pair[1]))
                 self._cache.open()
                 self.invalidateTransaction(*pair)
+                self._server = server
+                self._ready.set()
                 return "quick verification"
             
         log2(INFO, "Verifying cache")
@@ -425,6 +492,7 @@
         self._pickler.fast = 1 # Don't use the memo
 
         self._cache.verify(server.zeoVerify)
+        self._pending_server = server
         server.endZeoVerify()
         return "full verification"
 
@@ -445,6 +513,7 @@
         log2(PROBLEM, "Disconnected from storage: %s"
              % repr(self._server_addr))
         self._connection = None
+        self._ready.clear()
         self._server = disconnected_stub
 
     def __len__(self):
@@ -847,11 +916,19 @@
 
         while 1:
             oid, version = unpick.load()
+            log2(INFO, "verify invalidate %r" % oid)
             if not oid:
                 break
             self._cache.invalidate(oid, version=version)
-            self._db.invalidate(oid, version=version)
+            if self._db is not None:
+                self._db.invalidate(oid, version=version)
         f.close()
+
+        log2(INFO, "endVerify finishing")
+        self._server = self._pending_server
+        self._ready.set()
+        self._pending_conn = None
+        log2(INFO, "endVerify finished")
 
     def invalidateTransaction(self, tid, args):
         """Invalidate objects modified by tid."""


=== ZODB3/ZEO/ServerStub.py 1.11 => 1.12 ===
--- ZODB3/ZEO/ServerStub.py:1.11	Fri Jan  3 17:07:38 2003
+++ ZODB3/ZEO/ServerStub.py	Tue Jan 14 14:08:33 2003
@@ -39,15 +39,6 @@
     def extensionMethod(self, name):
         return ExtensionMethodWrapper(self.rpc, name).call
 
-    def _update(self):
-        """Handle pending incoming messages.
-
-        This method is typically only used when no asyncore mainloop
-        is already active.  It can cause arbitrary callbacks from the
-        server to the client to be handled.
-        """
-        self.rpc.pending()
-
     def register(self, storage_name, read_only):
         self.rpc.call('register', storage_name, read_only)