[Zope-Checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.101

Jeremy Hylton jeremy@zope.com
Thu, 5 Jun 2003 18:38:35 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv13384/ZEO

Modified Files:
	ClientStorage.py 
Log Message:
Merge atomic cache invalidation code from the 3.1 release branch.


=== ZODB3/ZEO/ClientStorage.py 1.100 => 1.101 ===
--- ZODB3/ZEO/ClientStorage.py:1.100	Fri May 30 15:20:57 2003
+++ ZODB3/ZEO/ClientStorage.py	Thu Jun  5 18:38:35 2003
@@ -268,6 +268,12 @@
         self._oid_lock = threading.Lock()
         self._oids = [] # Object ids retrieved from new_oids()
 
+        # Can't read data in one thread while writing data
+        # (tpc_finish) in another thread.  In general, the lock
+        # must prevent access to the cache while _update_cache
+        # is executing.
+        self._lock = threading.Lock()
+
         t = self._ts = get_timestamp()
         self._serial = `t`
         self._oid = '\0\0\0\0\0\0\0\0'
@@ -688,11 +694,19 @@
         specified by the given object id and version, if they exist;
         otherwise a KeyError is raised.
         """
-        p = self._cache.load(oid, version)
-        if p:
-            return p
+        self._lock.acquire()    # for atomic processing of invalidations
+        try:
+            p = self._cache.load(oid, version)
+            if p:
+                return p
+        finally:
+            self._lock.release()
+            
         if self._server is None:
             raise ClientDisconnected()
+        
+        # If an invalidation for oid comes in during zeoLoad, that's OK
+        # because we'll get oid's new state.
         p, s, v, pv, sv = self._server.zeoLoad(oid)
         self._cache.checkSize(0)
         self._cache.store(oid, p, s, v, pv, sv)
@@ -708,9 +722,13 @@
 
         If no version modified the object, return an empty string.
         """
-        v = self._cache.modifiedInVersion(oid)
-        if v is not None:
-            return v
+        self._lock.acquire()
+        try:
+            v = self._cache.modifiedInVersion(oid)
+            if v is not None:
+                return v
+        finally:
+            self._lock.release()
         return self._server.modifiedInVersion(oid)
 
     def new_oid(self):
@@ -847,16 +865,20 @@
         if transaction is not self._transaction:
             return
         try:
+            self._lock.acquire()  # for atomic processing of invalidations
+            try:
+                self._update_cache()
+            finally:
+                self._lock.release()
+                
             if f is not None:
                 f()
 
             tid = self._server.tpc_finish(self._serial)
+            self._cache.setLastTid(tid)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
-
-            self._update_cache()
-            self._cache.setLastTid(tid)
         finally:
             self.end_transaction()
 
@@ -866,6 +888,8 @@
         This iterates over the objects in the transaction buffer and
         update or invalidate the cache.
         """
+        # Must be called with _lock already acquired.
+        
         self._cache.checkSize(self._tbuf.get_size())
         try:
             self._tbuf.begin_iterate()
@@ -912,10 +936,13 @@
         """Storage API: undo a transaction, writing directly to the storage."""
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        # XXX what are the sync issues here?
         oids = self._server.undo(transaction_id)
-        for oid in oids:
-            self._cache.invalidate(oid, '')
+        self._lock.acquire()
+        try:
+            for oid in oids:
+                self._cache.invalidate(oid, '')
+        finally:
+            self._lock.release()
         return oids
 
     def undoInfo(self, first=0, last=-20, specification=None):
@@ -969,15 +996,19 @@
         # oid, version pairs.  The DB's invalidate() method expects a
         # dictionary of oids.
         
-        # versions maps version names to dictionary of invalidations
-        versions = {}
-        for oid, version in invs:
-            d = versions.setdefault(version, {})
-            self._cache.invalidate(oid, version=version)
-            d[oid] = 1
-        if self._db is not None:
-            for v, d in versions.items():
-                self._db.invalidate(d, version=v)
+        self._lock.acquire()
+        try:
+            # versions maps version names to dictionary of invalidations
+            versions = {}
+            for oid, version in invs:
+                d = versions.setdefault(version, {})
+                self._cache.invalidate(oid, version=version)
+                d[oid] = 1
+            if self._db is not None:
+                for v, d in versions.items():
+                    self._db.invalidate(d, version=v)
+        finally:
+            self._lock.release()
 
     def endVerify(self):
         """Server callback to signal end of cache validation."""