[Zodb-checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.6.4.6 ServerStub.py:1.9.22.2 ClientStorage.py:1.73.2.7.2.5

Jeremy Hylton jeremy@zope.com
Tue, 17 Dec 2002 15:41:48 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv16110/ZEO

Modified Files:
      Tag: ZODB3-fast-restart-branch
	StorageServer.py ServerStub.py ClientStorage.py 
Log Message:
Add untested implementation of fast verification.

The server method getInvalidations(tid) returns all the objects
invalidated since tid.  If tid is too old, the server will just return
None.  If not, it will return a pair containing the most recent tid at
the server and a list of oid, version pairs for all objects modified
after tid and up to the returned tid.

The client calls this method to attempt to validation its cache before
doing the normal cache validation.

Inline the beginZeoVerify call in verify_cache() and remove it from
the client-server protocol.

On the server, save invalidations from the last 1000 transactions.



=== ZODB3/ZEO/StorageServer.py 1.74.2.6.4.5 => 1.74.2.6.4.6 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.6.4.5	Tue Dec 17 14:33:29 2002
+++ ZODB3/ZEO/StorageServer.py	Tue Dec 17 15:41:48 2002
@@ -67,7 +67,8 @@
     ZEOStorageClass = None # patched up later
     ManagedServerConnectionClass = ManagedServerConnection
 
-    def __init__(self, addr, storages, read_only=0):
+    def __init__(self, addr, storages, read_only=0,
+                 invalidation_queue_size=1000):
 
         """StorageServer constructor.
 
@@ -108,6 +109,9 @@
         for s in storages.values():
             s._waiting = []
         self.read_only = read_only
+        # A list of at most invalidation_queue_size invalidations
+        self.invq = []
+        self.invq_bound = invalidation_queue_size
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection,
@@ -162,12 +166,35 @@
           the current client.
 
         """
+        if invalidated:
+            if len(self.invq) >= self.invq_bound:
+                del self.invq[0]
+            self.invq.append((tid, invalidated))
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
                 p.client.invalidateTransaction(tid, invalidated)
             elif info is not None:
                 p.client.info(info)
 
+    def get_invalidations(self, tid):
+        """Return a tid and list of all objects invalidation since tid.
+
+        The tid is the most recent transaction id committed by the server.
+
+        Returns None if it is unable to provide a complete list
+        of invalidations for tid.  In this case, client should
+        do full cache verification.
+        """
+        earliest_tid = self.invq[0][0]
+        if earliest_tid > tid:
+            return None, ()
+        oids = {}
+        for tid, L in self.invq:
+            for key in L:
+                oids[key] = 1
+        latest_tid = self.invq[-1][0]
+        return latest_tid, oids.keys()
+
     def close_server(self):
         """Close the dispatcher so that there are no new connections.
 
@@ -321,8 +348,11 @@
                 raise
         return p, s, v, pv, sv
 
-    def beginZeoVerify(self):
-        self.client.beginVerify()
+    def getInvalidations(self, tid):
+        invtid, invlist = self.server.get_invalidations(tid)
+        if invtid is None:
+            return None
+        return invtid, invlist
 
     def zeoVerify(self, oid, s, sv):
         try:


=== ZODB3/ZEO/ServerStub.py 1.9.22.1 => 1.9.22.2 ===
--- ZODB3/ZEO/ServerStub.py:1.9.22.1	Tue Dec 17 13:07:59 2002
+++ ZODB3/ZEO/ServerStub.py	Tue Dec 17 15:41:48 2002
@@ -51,8 +51,8 @@
     def lastTransaction(self):
         return self.rpc.call('lastTransaction')
 
-    def beginZeoVerify(self):
-        self.rpc.callAsync('beginZeoVerify')
+    def getInvalidations(self):
+        return self.rpc.call('getInvalidations')
 
     def zeoVerify(self, oid, s, sv):
         self.rpc.callAsync('zeoVerify', oid, s, sv)


=== ZODB3/ZEO/ClientStorage.py 1.73.2.7.2.4 => 1.73.2.7.2.5 ===
--- ZODB3/ZEO/ClientStorage.py:1.73.2.7.2.4	Tue Dec 17 14:32:08 2002
+++ ZODB3/ZEO/ClientStorage.py	Tue Dec 17 15:41:48 2002
@@ -60,6 +60,9 @@
 class ClientDisconnected(ClientStorageError, Disconnected):
     """The database storage is disconnected from the storage."""
 
+def tid2time(tid):
+    return str(TimeStamp(tid))
+
 def get_timestamp(prev_ts=None):
     """Internal helper to return a unique TimeStamp instance.
 
@@ -394,8 +397,6 @@
 
     def verify_cache(self, server):
         """Internal routine called to verify the cache."""
-        # XXX beginZeoVerify ends up calling back to beginVerify() below.
-        # That whole exchange is rather unnecessary.
         if self._last_inval_tid is not None:
             ltid = server.lastTransaction()
             if ltid == self._last_inval_tid:
@@ -403,12 +404,21 @@
                      "(_last_inval_tid up-to-date)")
                 return # No need to verify the cache
             log2(INFO, "last inval tid: %r %s"
-                 % (self._last_inval_tid,
-                    str(TimeStamp(self._last_inval_tid))))
-            log2(INFO, "last transaction: %r %s"
-                 % (ltid, str(TimeStamp(ltid))))
+                 % (self._last_inval_tid, tid2time(self._last_inval_tid)))
+            log2(INFO, "last transaction: %r %s" % (ltid, tid2time(ltid)))
+
+            pair = server.getInvalidations(self._last_inval_tid)
+            if pair is not None:
+                log2(INFO, "Recovering %d invalidations" % len(pair[1]))
+                self.invalidateTransaction(*pair)
+                return
+            
         log2(INFO, "Verifying cache")
-        server.beginZeoVerify()
+        # setup tempfile to hold zeoVerify results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
         self._cache.verify(server.zeoVerify)
         server.endZeoVerify()
 
@@ -790,12 +800,6 @@
     def info(self, dict):
         """Server callback to update the info dictionary."""
         self._info.update(dict)
-
-    def beginVerify(self):
-        """Server callback to signal start of cache validation."""
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
 
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.