[Zodb-checkins] SVN: ZODB/branches/jim-zeo-blob-cache/src/Z checkpoint

Jim Fulton jim at zope.com
Tue Dec 2 17:51:20 EST 2008


Log message for revision 93556:
  checkpoint

Changed:
  U   ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test
  U   ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py

-=-
Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py	2008-12-02 22:15:03 UTC (rev 93555)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/ClientStorage.py	2008-12-02 22:51:20 UTC (rev 93556)
@@ -26,7 +26,6 @@
 from ZEO import ServerStub
 from ZEO.TransactionBuffer import TransactionBuffer
 from ZEO.zrpc.client import ConnectionManager
-from ZODB.blob import rename_or_copy_blob
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
@@ -44,6 +43,7 @@
 import weakref
 import zc.lockfile
 import ZEO.interfaces
+import ZODB
 import ZODB.BaseStorage
 import ZODB.interfaces
 import zope.event
@@ -370,8 +370,14 @@
             # Avoid doing this import unless we need it, as it
             # currently requires pywin32 on Windows.
             import ZODB.blob
-            self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
-            self.fshelper.create()
+            if shared_blob_dir:
+                self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
+            else:
+                if 'zeocache' not in ZODB.blob.LAYOUTS:
+                    ZODB.blob.LAYOUTS['zeocache'] = BlobCacheLayout()
+                self.fshelper = ZODB.blob.FilesystemHelper(
+                    blob_dir, layout_name='zeocache')
+                self.fshelper.create()
             self.fshelper.checkSecure()
         else:
             self.fshelper = None
@@ -462,7 +468,7 @@
 
     def _check_blob_size_method(self):
         try:
-            lock = zc.lockfile.LockFile(
+            check_lock = zc.lockfile.LockFile(
                 os.path.join(self.blob_dir, 'cache.lock'))
         except zc.lockfile.LockError:
             # Someone is already cleaning up, so don't bother
@@ -470,6 +476,7 @@
 
         try:
            target = self._blob_cache_size
+           size = 0
            tmp = self.temporaryDirectory()
            blob_suffix = ZODB.blob.BLOB_SUFFIX
            files_by_atime = BTrees.IOBTree.BTree()
@@ -482,23 +489,33 @@
                        continue
                    file_name = os.path.join(base, file_name)
                    stat = os.stat(file_name)
-                   target -= stat.st_size
-                   t = max(stat.st_atime, stat.st_mtime)
+                   size += stat.st_size
+                   t = stat.st_atime
                    if t not in files_by_atime:
                        files_by_atime[t] = []
                    files_by_atime[t].append(file_name)
 
-           while target <= 0 and files_by_atime:
+           while size > target and files_by_atime:
                for file_name in files_by_atime.pop(files_by_atime.minKey()):
-                   size = os.stat(file_name).st_size
+                   lockfilename = os.path.join(os.path.dirname(file_name),
+                                               '.lock')
                    try:
-                       os.remove(file_name)
-                   except OSError:
-                       raise
-                   else:
-                       target -= size
+                       lock = zc.lockfile.LockFile(lockfilename)
+                   except zc.lockfile.LockError:
+                       continue  # In use, skip
+
+                   try:
+                       size = os.stat(file_name).st_size
+                       try:
+                           os.remove(file_name)
+                       except OSError:
+                           raise
+                       else:
+                           size -= size
+                   finally:
+                       lock.close()
         finally:
-            lock.close()
+            check_lock.close()
 
 
     def registerDB(self, db):
@@ -953,10 +970,10 @@
             # use a slightly different file name. We keep the old one
             # until we're done to avoid conflicts. Then remove the old name.
             target += 'w'
-            rename_or_copy_blob(filename, target)
+            ZODB.blob.rename_or_copy_blob(filename, target)
             os.remove(target[:-1])
         else:
-            rename_or_copy_blob(filename, target)
+            ZODB.blob.rename_or_copy_blob(filename, target)
 
         # Now tell the server where we put it
         self._server.storeBlobShared(
@@ -965,7 +982,8 @@
     def receiveBlobStart(self, oid, serial):
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         assert not os.path.exists(blob_filename)
-        assert os.path.exists(blob_filename+'.lock')
+        lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+        assert os.path.exists(lockfilename)
         blob_filename += '.dl'
         assert not os.path.exists(blob_filename)
         f = open(blob_filename, 'wb')
@@ -999,6 +1017,8 @@
         blob_filename = self.fshelper.getBlobFilename(oid, serial)
         # Case 1: Blob is available already, just use it
         if os.path.exists(blob_filename):
+            if not self.shared_blob_dir:
+                _accessed(blob_filename)
             return blob_filename
 
         if self.shared_blob_dir:
@@ -1006,57 +1026,30 @@
             # here, it's not anywhere.
             raise POSException.POSKeyError("No blob file", oid, serial)
 
-        self._blob_download_name = 
-
-
         # First, we'll create the directory for this oid, if it doesn't exist. 
-#        self.fshelper.createPathForOID(oid)
+        self.fshelper.createPathForOID(oid)
 
         # OK, it's not here and we (or someone) needs to get it.  We
         # want to avoid getting it multiple times.  We want to avoid
         # getting it multiple times even accross separate client
         # processes on the same machine. We'll use file locking.
 
-        lockfilename = os.path.join(
-            self.blob_dir, (oid+serial).encode(hex)+'.lock')
+        lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+        while 1:
+            try:
+                lock = zc.lockfile.LockFile(lockfilename)
+            except zc.lockfile.LockError:
+                time.sleep(0.01)
+            else:
+                break
 
-
-        lockfilename = blob_filename+'.lock'
         try:
-            lock = zc.lockfile.LockFile(lockfilename)
-        except zc.lockfile.LockError:
-
-            # Someone is already downloading the Blob. Wait for the
-            # lock to be freed.  How long should we be willing to wait?
-            # TODO: maybe find some way to assess download progress.
-
-            while 1:
-                time.sleep(0.1)
-                try:
-                    lock = zc.lockfile.LockFile(lockfilename)
-                except zc.lockfile.LockError:
-                    pass
-                else:
-                    # We have the lock. We should be able to get the file now.
-                    lock.close()
-                    try:
-                        os.remove(lockfilename)
-                    except OSError:
-                        pass
-                    break
-
-            if os.path.exists(blob_filename):
-                return blob_filename
-
-            return None
-
-        try:
             # We got the lock, so it's our job to download it.  First,
             # we'll double check that someone didn't download it while we
             # were getting the lock:
 
             if os.path.exists(blob_filename):
-                return blob_filename
+                return _accessed(blob_filename)
 
             # Ask the server to send it to us.  When this function
             # returns, it will have been sent. (The recieving will
@@ -1065,17 +1058,41 @@
             self._server.sendBlob(oid, serial)
 
             if os.path.exists(blob_filename):
-                return blob_filename
+                return _accessed(blob_filename)
 
             raise POSException.POSKeyError("No blob file", oid, serial)
 
         finally:
             lock.close()
+
+    def openCommittedBlobFile(self, oid, serial, blob):
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
+        lockfilename = os.path.join(os.path.dirname(blob_filename), '.lock')
+        while 1:
             try:
-                os.remove(lockfilename)
-            except OSError:
-                pass
+                lock = zc.lockfile.LockFile(lockfilename)
+            except zc.lockfile.LockError:
+                time.sleep(.01)
+            else:
+                break
 
+        try:
+            blob_filename = self.fshelper.getBlobFilename(oid, serial)
+            if not os.path.exists(blob_filename):
+                if self.shared_blob_dir:
+                    # We're using a server shared cache.  If the file isn't
+                    # here, it's not anywhere.
+                    raise POSException.POSKeyError("No blob file", oid, serial)
+                self._server.sendBlob(oid, serial)
+                if not os.path.exists(blob_filename):
+                    raise POSException.POSKeyError("No blob file", oid, serial)
+
+            _accessed(blob_filename)
+            return ZODB.blob.BlobFile(blob_filename, 'r', blob)
+        finally:
+            lock.close()
+        
+
     def temporaryDirectory(self):
         return self.fshelper.temp_dir
 
@@ -1218,9 +1235,10 @@
                 oid, blobfilename = blobs.pop()
                 self._blob_data_bytes_loaded += os.stat(blobfilename).st_size
                 targetpath = self.fshelper.getPathForOID(oid, create=True)
-                rename_or_copy_blob(blobfilename,
-                                    self.fshelper.getBlobFilename(oid, tid),
-                                    )
+                ZODB.blob.rename_or_copy_blob(
+                    blobfilename,
+                    self.fshelper.getBlobFilename(oid, tid),
+                    )
                 if self._blob_data_bytes_loaded > self._blob_cache_size_check:
                     self._check_blob_size()
 
@@ -1579,6 +1597,7 @@
             raise ZODB.interfaces.StorageStopIteration()
         return ZODB.BaseStorage.DataRecord(*item)
 
+
 class ClientStorage308Adapter:
 
     def __init__(self, client):
@@ -1592,3 +1611,22 @@
 
     def __getattr__(self, name):
         return getattr(self.client, name)
+
+
+class BlobCacheLayout(object):
+
+    size = 997
+
+    def oid_to_path(self, oid):
+        return str(utils.u64(oid) % self.size)
+
+    def getBlobFilePath(self, oid, tid):
+        base, rem = divmod(utils.u64(oid), self.size)
+        return os.path.join(
+            str(rem),
+            "%s.%s%s" % (base, tid.encode('hex'), ZODB.blob.BLOB_SUFFIX)
+            )
+
+def _accessed(filename):
+    os.utime(filename, (time.time(), os.stat(filename).st_mtime))
+    return filename

Modified: ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test	2008-12-02 22:15:03 UTC (rev 93555)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZEO/tests/zeo_blob_cache.test	2008-12-02 22:51:20 UTC (rev 93556)
@@ -34,14 +34,22 @@
 blob_cache_size_check option defaults to 100. We passed 10, to check
 after writing 10% of the target size.
 
+We want to check for name collections in the blob cache dir. We'll try
+to provoke name collections by reducing the number of cache directory
+subdirectories.
+
+    >>> import ZEO.ClientStorage
+    >>> orig_blob_cache_layout_size = ZEO.ClientStorage.BlobCacheLayout.size
+    >>> ZEO.ClientStorage.BlobCacheLayout.size = 11
+
 Now, let's write some data:
 
-    >>> import ZODB.blob, transaction
+    >>> import ZODB.blob, transaction, time
     >>> conn = db.open()
     >>> for i in range(1, 101):
     ...     conn.root()[i] = ZODB.blob.Blob()
     ...     conn.root()[i].open('w').write(chr(i)*100)
-    ...     transaction.commit()
+    >>> transaction.commit()
 
 We've committed 10000 bytes of data, but our target size is 4000.  We
 expect to have not much more than the target size in the cache blob
@@ -66,8 +74,9 @@
 target:
 
     >>> for i in range(1, 101):
-    ...     if conn.root()[i].open().read() != chr(i)*100:
-    ...         print 'bad data', i
+    ...     data = conn.root()[i].open().read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
 
     >>> db.storage._check_blob_size_thread.join()
 
@@ -75,16 +84,55 @@
     True
 
     >>> for i in range(1, 101):
-    ...     if conn.root()[i].open().read() != chr(i)*100:
-    ...         print 'bad data', i
+    ...     data = conn.root()[i].open().read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
 
     >>> db.storage._check_blob_size_thread.join()
 
     >>> cache_size('blobs') < 5000
     True
 
+    >>> for i in range(1, 101):
+    ...     data = open(conn.root()[i].committed(), 'rb').read()
+    ...     if data != chr(i)*100:
+    ...         print 'bad data', `chr(i)`, `data`
 
+    >>> db.storage._check_blob_size_thread.join()
 
+    >>> cache_size('blobs') < 5000
+    True
+
+Now let see if we can stress things a bit.  We'll create many clients
+and get them to pound on the blobs all at once to see if we can
+provoke problems:
+
+    >>> import threading, random
+    >>> def run():
+    ...     db = ZEO.DB(addr, blob_dir='blobs',
+    ...                 blob_cache_size=4000, blob_cache_size_check=10)
+    ...     conn = db.open()
+    ...     for i in range(300):
+    ...         time.sleep(0)
+    ...         i = random.randint(1, 100)
+    ...         data = conn.root()[i].open().read()
+    ...         if data != chr(i)*100:
+    ...             print 'bad data', `chr(i)`, `data`
+    ...     db._storage._check_blob_size_thread.join()
+    ...     db.close()
+
+    >>> threads = [threading.Thread(target=run) for i in range(10)]
+    >>> for thread in threads:
+    ...     thread.setDaemon(True)
+    >>> for thread in threads:
+    ...     thread.start()
+    >>> for thread in threads:
+    ...     thread.join()
+
+    >>> cache_size('blobs') < 5000
+    True
+
 .. cleanup
 
     >>> db.close()
+    >>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size

Modified: ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py
===================================================================
--- ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py	2008-12-02 22:15:03 UTC (rev 93555)
+++ ZODB/branches/jim-zeo-blob-cache/src/ZODB/blob.py	2008-12-02 22:51:20 UTC (rev 93556)
@@ -126,10 +126,29 @@
             self.readers = []
 
         if mode == 'r':
-            if self._current_filename() is None:
-                self._create_uncommitted_file()
+            result = None
+            to_open = self._p_blob_uncommitted
+            if not to_open:
+                to_open = self._p_blob_committed
+                if to_open:
+                    storage = self._p_jar._storage
+                    if hasattr(storage, 'openCommittedBlobFile'):
+                        result = storage.openCommittedBlobFile(
+                            self._p_oid, self._p_serial, self)
+                    else:
+                        # We do this to make sure we have the file and
+                        # to let the storage know we're accessing the file.
+                        # It might be nice to add a more explicit api for this.
+                        n = storage.loadBlob(self._p_oid, self._p_serial)
+                        assert to_open == n, (to_open, n) 
+                        result = BlobFile(to_open, mode, self)
+                else:
+                    self._create_uncommitted_file()
+                    to_open = self._p_blob_uncommitted
+                    assert to_open
 
-            result = BlobFile(self._current_filename(), mode, self)
+            if result is None:
+                result = BlobFile(to_open, mode, self)
 
             def destroyed(ref, readers=self.readers):
                 try:
@@ -178,8 +197,16 @@
             self._p_blob_committed.endswith(SAVEPOINT_SUFFIX)
             ):
             raise BlobError('Uncommitted changes')
-        return self._p_blob_committed
 
+        result = self._p_blob_committed
+        
+        # We do this to make sure we have the file and to let the
+        # storage know we're accessing the file.
+        n = self._p_jar._storage.loadBlob(self._p_oid, self._p_serial)
+        assert result == n, (result, n)
+
+        return result
+
     def consumeFile(self, filename):
         """Will replace the current data of the blob with the file given under
         filename.
@@ -231,11 +258,6 @@
 
     # utility methods
 
-    def _current_filename(self):
-        # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
-        # Connection._setstate
-        return self._p_blob_uncommitted or self._p_blob_committed
-
     def _create_uncommitted_file(self):
         assert self._p_blob_uncommitted is None, (
             "Uncommitted file already exists.")
@@ -388,13 +410,13 @@
         'committed' blob file related to that oid and tid.
 
         """
-        oid_path = self.getPathForOID(oid)
         # TIDs are numbers and sometimes passed around as integers. For our
         # computations we rely on the 64-bit packed string representation
         if isinstance(tid, int):
             tid = utils.p64(tid)
-        filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
-        return os.path.join(oid_path, filename)
+        return os.path.join(self.base_dir,
+                            self.layout.getBlobFilePath(oid, tid),
+                            )
 
     def blob_mkstemp(self, oid, tid):
         """Given an oid and a tid, return a temporary file descriptor
@@ -513,10 +535,18 @@
         oid = ''.join(binascii.unhexlify(byte[2:]) for byte in path)
         return oid
 
+    def getBlobFilePath(self, oid, tid):
+        """Given an oid and a tid, return the full filename of the
+        'committed' blob file related to that oid and tid.
+
+        """
+        oid_path = self.oid_to_path(oid)
+        filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
+        return os.path.join(oid_path, filename)
+
 LAYOUTS['bushy'] = BushyLayout()
 
-
-class LawnLayout(object):
+class LawnLayout(BushyLayout):
     """A shallow directory layout for blob directories.
 
     Creates a single level of directories (one for each oid).



More information about the Zodb-checkins mailing list