[Zodb-checkins] SVN: ZODB/branches/3.8/ Refactored handling of invalidations on ZEO clients to fix

Jim Fulton jim at zope.com
Fri Sep 5 14:55:49 EDT 2008


Log message for revision 90892:
  Refactored handling of invalidations on ZEO clients to fix
  a possible ordering problem for invalidation messages.
  
  This was motivated by code inspecition during merge of an earlier
  refactoring to the trunk.  The refactoring also simplifies the code
  and probably makes it a tad faster.
  

Changed:
  U   ZODB/branches/3.8/NEWS.txt
  U   ZODB/branches/3.8/src/ZEO/ClientStorage.py

-=-
Modified: ZODB/branches/3.8/NEWS.txt
===================================================================
--- ZODB/branches/3.8/NEWS.txt	2008-09-05 18:54:25 UTC (rev 90891)
+++ ZODB/branches/3.8/NEWS.txt	2008-09-05 18:55:49 UTC (rev 90892)
@@ -4,6 +4,9 @@
 
 Bugs Fixed:
 
+- (beta 8) Refactored handling of invalidations on ZEO clients to fix
+  a possible ordering problem for invalidation messages.
+
 - (beta 8) An ZEO cache internal data structure can get out of sync
   with the data in a way that prevents data from being loaded into the
   cache. We don't yet know why, but added an exception handler to

Modified: ZODB/branches/3.8/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/3.8/src/ZEO/ClientStorage.py	2008-09-05 18:54:25 UTC (rev 90891)
+++ ZODB/branches/3.8/src/ZEO/ClientStorage.py	2008-09-05 18:55:49 UTC (rev 90892)
@@ -1240,11 +1240,9 @@
         # Invalidation as result of verify_cache().
         # Queue an invalidate for the end the verification procedure.
         if self._pickler is None:
-            # This should never happen.  TODO:  assert it doesn't, or log
-            # if it does.
+            log2("invalidateVerify with no _pickler", level=logging.ERROR)
             return
-        oid, version = args
-        self._pickler.dump((oid, version, None))
+        self._pickler.dump((None, [args]))
 
     def endVerify(self):
         """Server callback to signal end of cache validation."""
@@ -1258,32 +1256,26 @@
         try:
             if catch_up:
                 # process catch-up invalidations
-                tid, invalidations = catch_up
-                self._process_invalidations(
-                    (oid, version, tid)
-                    for oid, version in invalidations
-                    )
+                self._process_invalidations(*catch_up)
             
             if self._pickler is None:
                 return
             # write end-of-data marker
-            self._pickler.dump((None, None, None))
+            self._pickler.dump((None, None))
             self._pickler = None
             self._tfile.seek(0)
             unpickler = cPickle.Unpickler(self._tfile)
             min_tid = self._cache.getLastTid()
-            def InvalidationLogIterator():
-                while 1:
-                    oid, version, tid = unpickler.load()
-                    if oid is None:
-                        break
-                    if ((tid is None)
-                        or (min_tid is None)
-                        or (tid > min_tid)
-                        ):
-                        yield oid, version, tid
+            while 1:
+                tid, invalidations = unpickler.load()
+                if invalidations is None:
+                    break
+                if ((tid is None)
+                    or (min_tid is None)
+                    or (tid > min_tid)
+                    ):
+                    self._process_invalidations(tid, invalidations)
 
-            self._process_invalidations(InvalidationLogIterator())
             self._tfile.close()
             self._tfile = None
         finally:
@@ -1301,34 +1293,32 @@
             if self._pickler is not None:
                 log2("Transactional invalidation during cache verification",
                      level=BLATHER)
-                for oid, version in args:
-                    self._pickler.dump((oid, version, tid))
+                self._pickler.dump((tid, args))
                 return
-            self._process_invalidations([(oid, version, tid)
-                                         for oid, version in args])
+            self._process_invalidations(tid, args)
         finally:
             self._lock.release()
 
-    def _process_invalidations(self, invs):
+    def _process_invalidations(self, tid, invs):
         # Invalidations are sent by the ZEO server as a sequence of
         # oid, version, tid triples.  The DB's invalidate() method expects a
         # dictionary of oids.
 
         # versions maps version names to dictionary of invalidations
         versions = {}
-        for oid, version, tid in invs:
+        for oid, version in invs:
             if oid == self._load_oid:
                 self._load_status = 0
             self._cache.invalidate(oid, version, tid)
-            oids = versions.get((version, tid))
+            oids = versions.get(version)
             if not oids:
-                versions[(version, tid)] = [oid]
+                versions[version] = [oid]
             else:
                 oids.append(oid)
 
         if self._db is not None:
-            for (version, tid), d in versions.items():
-                self._db.invalidate(tid, d, version=version)
+            for version, oids in versions.items():
+                self._db.invalidate(tid, oids, version=version)
 
     # The following are for compatibility with protocol version 2.0.0
 



More information about the Zodb-checkins mailing list