[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py Began reimplementing loadBlob:

Jim Fulton jim at zope.com
Wed May 16 07:13:19 EDT 2007


Log message for revision 75802:
  Began reimplementing loadBlob:
  
  - Don't download if we have a shared blob directory with the server
  
  - Use file locking to allow coordination among multiple clients
  
  - Request that the server send it to us.  We now wait passively for
    the send.  This allows the server to use an iterator to send it to
    us to avoid round trips and balooning memory.
  
  I still need to implement the sending mechanism, but this should be
  substantially like storeBlob, which is already done.
  

Changed:
  U   ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py

-=-
Modified: ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py	2007-05-16 11:09:43 UTC (rev 75801)
+++ ZODB/branches/jim-zeo-blob/src/ZEO/ClientStorage.py	2007-05-16 11:13:19 UTC (rev 75802)
@@ -35,6 +35,7 @@
 from ZEO.auth import get_module
 from ZEO.zrpc.client import ConnectionManager
 
+import ZODB.lock_file
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
@@ -916,97 +917,85 @@
             oid, serial, data,
             os.path.basename(target), version, id(txn))
 
-    def _do_load_blob(self, oid, serial, version):
-        """Do the actual loading from the RPC server."""
-        blob_filename = self.fshelper.getBlobFilename(oid, serial)
-        if self._server is None:
-            raise ClientDisconnected()
+    def _have_blob(self, blob_filename, oid, serial):
+        if os.path.exists(blob_filename):
+            log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
+                utils.tid_repr(serial)), level=BLATHER)
+            return True
+        return False
+        
+    def loadBlob(self, oid, serial):
 
-        targetpath = self.fshelper.getPathForOID(oid)
-        if not os.path.exists(targetpath):
-            os.makedirs(targetpath, 0700)
-
-        # We write to a temporary file first, so we do not accidentally 
-        # allow half-baked copies of this blob be loaded
-        tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
-        tempfile = os.fdopen(tempfd, 'wb')
-
-        offset = 0
-        while True:
-            chunk = self._server.loadBlob(oid, serial, version, offset)
-            if not chunk:
-                break
-            offset += len(chunk)
-            tempfile.write(chunk)
-
-        tempfile.close()
-        # XXX will fail on Windows if file is open
-        os.rename(tempfilename, blob_filename)
-        return blob_filename
-
-    def loadBlob(self, oid, serial, version):
-        """Loading a blob has to know about loading the same blob
-           from another thread as the same time.
-
-            1. Check if the blob is downloaded already
-            2. Check whether it is currently beeing downloaded
-            2a. Wait for other download to finish, return 
-            3. If not beeing downloaded, start download
-        """
+        # Load a blob.  If it isn't present and we have a shared blob
+        # directory, then assume that it doesn't exist on the server
+        # and return None.
         if self.fshelper is None:
             raise POSException.Unsupported("No blob cache directory is "
                                            "configured.")
 
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         # Case 1: Blob is available already, just use it
-        if os.path.exists(blob_filename):
-            log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
-                utils.tid_repr(serial)), level=BLATHER)
+        if self._have_blob(blob_filename, oid, serial):
             return blob_filename
 
-        # Case 2,3: Blob might still be downloading or not there yet
+        if self.blob_cache_writable:
+            # We're using a server shared cache.  If the file isn't
+            # here, it's not anywahere.
+            return None
 
-        # Try to get or create a lock for the downloading of this blob, 
-        # identified by it's oid and serial
-        lock_key = (oid, serial)
-        
-        # We need to make the check for an existing lock and the possible
-        # creation of a new one atomic, so there is another lock:
-        self.blob_status_lock.acquire()
+        # OK, it's not here and we (or someone) needs to get it.  We
+        # want to avoid getting it multiple times.  We want to avoid
+        # getting it multiple times even accross separate client
+        # processes on the same machine. We'll use file locking.
+
         try:
-            if not self.blob_status.has_key(oid):
-                self.blob_status[lock_key] = self.getBlobLock()
-            lock = self.blob_status[lock_key]
-        finally:
-            self.blob_status_lock.release()
+            lock = ZODB.lock_file.LockFile(blob_filename+'.lock')
+        except Exception:
+            # TODO: fic LockFile so that it raises consistent
+            # exceptions accross platforms.
 
-        # We acquire the lock to either start downloading, or wait
-        # for another download to finish
-        lock.acquire()
+            # Someone is already downloading the Blob. Wait for the
+            # lock to be freed.  How long should we be willing to wait?
+            # TODO: maybe find some way to assess download progress.
+
+            while 1:
+                time.sleep(0.1)
+                try:
+                    lock = ZODB.lock_file.LockFile(blob_filename+'.lock')
+                except Exception:
+                    pass
+                else:
+                    # We have the lock. We should be able to get the file now.
+                    lock.close()
+                    break
+            
+                if self._have_blob(blob_filename, oid, serial):
+                    return blob_filename
+
+            raise AssertionError("Can't find downloaded blob file.")
+
         try:
-            # If there was another download that is finished by now,
-            # we just take the result.
-            if os.path.exists(blob_filename):
-                log2("Found blob %s/%s in cache after it was downloaded "
-                     "from another thread." % (utils.oid_repr(oid),
-                     utils.tid_repr(serial)), level=BLATHER)
+            # We got the lock, so it's our job to download it.  First,
+            # we'll double check that someone didn't download it while we
+            # were getting the lock:
+
+            if self._have_blob(blob_filename, oid, serial):
                 return blob_filename
 
-            # Otherwise we download and use that
-            return self._do_load_blob(oid, serial, version)
-        finally:
-            # When done we remove the download lock ...
-            lock.release()
+            # Ask the server to send it to us.  When this function
+            # returns, it will have been sent. (The recieving will
+            # have been handled by the asyncore thread.)
+            
+            self._server.sendBlob(oid, serial)
 
-            # And the status information isn't needed as well,
-            # but we have to use the second lock here as well, to avoid
-            # making the creation of this status lock non-atomic (see above)
-            self.blob_status_lock.acquire()
-            try:
-                del self.blob_status[lock_key]
-            finally:
-                self.blob_status_lock.release()
+            if self._have_blob(blob_filename, oid, serial):
+                return blob_filename
 
+            raise AssertionError("Can't find downloaded blob file.")
+
+        finally:
+            lock.close()
+
     def getBlobLock(self):
         # indirection to support unit testing
         return threading.Lock()



More information about the Zodb-checkins mailing list