[Zodb-checkins] SVN: ZODB/trunk/src/ Improved zeo blob cache clean up to make it a bit more robust and to

Jim Fulton jim at zope.com
Fri Jun 5 18:54:00 EDT 2009


Log message for revision 100661:
  Improved zeo blob cache clean up to make it a bit more robust and to
  avoid spurious test failures.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2009-06-05 22:53:38 UTC (rev 100660)
+++ ZODB/trunk/src/CHANGES.txt	2009-06-05 22:54:00 UTC (rev 100661)
@@ -14,6 +14,8 @@
 
 - Fixed analyze.py and added test.
 
+- ZEO client blob cache size management is a little bit more robust.
+
 3.9.0b1 (2009-05-04)
 ====================
 

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2009-06-05 22:53:38 UTC (rev 100660)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2009-06-05 22:54:00 UTC (rev 100661)
@@ -38,6 +38,7 @@
 import stat
 import sys
 import tempfile
+import thread
 import threading
 import time
 import types
@@ -398,6 +399,7 @@
         self._blob_cache_size = blob_cache_size
         self._blob_data_bytes_loaded = 0
         if blob_cache_size is not None:
+            assert blob_cache_size_check < 100
             self._blob_cache_size_check = (
                 blob_cache_size * blob_cache_size_check / 100)
             self._check_blob_size()
@@ -477,7 +479,7 @@
 
         check_blob_size_thread = threading.Thread(
             target=_check_blob_cache_size,
-            args=(self.blob_dir, self._blob_cache_size),
+            args=(self.blob_dir, target),
             )
         check_blob_size_thread.setDaemon(True)
         check_blob_size_thread.start()
@@ -1620,7 +1622,6 @@
 def _check_blob_cache_size(blob_dir, target):
 
     logger = logging.getLogger(__name__+'.check_blob_cache')
-    logger.info("Checking blob cache size")
 
     layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
                   ).read().strip()
@@ -1628,64 +1629,90 @@
         logger.critical("Invalid blob directory layout %s", layout)
         raise ValueError("Invalid blob directory layout", layout)
 
+    attempt_path = os.path.join(blob_dir, 'check_size.attempt')
+
     try:
         check_lock = zc.lockfile.LockFile(
             os.path.join(blob_dir, 'check_size.lock'))
     except zc.lockfile.LockError:
-        # Someone is already cleaning up, so don't bother
-        logger.info("Another thread is checking the blob cache size")
-        return
+        try:
+            time.sleep(1)
+            check_lock = zc.lockfile.LockFile(
+                os.path.join(blob_dir, 'check_size.lock'))
+        except zc.lockfile.LockError:
+            # Someone is already cleaning up, so don't bother
+            logger.debug("%s Another thread is checking the blob cache size.",
+                         thread.get_ident())
+            open(attempt_path, 'w').close() # Mark that we tried
+            return
 
+    logger.debug("%s Checking blob cache size. (target: %s)",
+                 thread.get_ident(), target)
+
     try:
-        size = 0
-        blob_suffix = ZODB.blob.BLOB_SUFFIX
-        files_by_atime = BTrees.IOBTree.BTree()
+        while 1:
+            size = 0
+            blob_suffix = ZODB.blob.BLOB_SUFFIX
+            files_by_atime = BTrees.OOBTree.BTree()
 
-        for dirname in os.listdir(blob_dir):
-            if not cache_file_name(dirname):
-                continue
-            base = os.path.join(blob_dir, dirname)
-            if not os.path.isdir(base):
-                continue
-            for file_name in os.listdir(base):
-                if not file_name.endswith(blob_suffix):
+            for dirname in os.listdir(blob_dir):
+                if not cache_file_name(dirname):
                     continue
-                file_name = os.path.join(base, file_name)
-                if not os.path.isfile(file_name):
+                base = os.path.join(blob_dir, dirname)
+                if not os.path.isdir(base):
                     continue
-                stat = os.stat(file_name)
-                size += stat.st_size
-                t = int(stat.st_atime)
-                if t not in files_by_atime:
-                    files_by_atime[t] = []
-                files_by_atime[t].append(file_name)
+                for file_name in os.listdir(base):
+                    if not file_name.endswith(blob_suffix):
+                        continue
+                    file_path = os.path.join(base, file_name)
+                    if not os.path.isfile(file_path):
+                        continue
+                    stat = os.stat(file_path)
+                    size += stat.st_size
+                    t = stat.st_atime
+                    if t not in files_by_atime:
+                        files_by_atime[t] = []
+                    files_by_atime[t].append(os.path.join(dirname, file_name))
 
-        logger.info("blob cache size: %s", size)
+            logger.debug("%s   blob cache size: %s", thread.get_ident(), size)
 
-        while size > target and files_by_atime:
-            for file_name in files_by_atime.pop(files_by_atime.minKey()):
-                lockfilename = os.path.join(os.path.dirname(file_name),
-                                            '.lock')
-                try:
-                    lock = zc.lockfile.LockFile(lockfilename)
-                except zc.lockfile.LockError:
-                    logger.info("Skipping locked %s",
-                                os.path.basename(file_name))
-                    continue  # In use, skip
+            if size <= target:
+                if os.path.isfile(attempt_path):
+                    os.remove(attempt_path)
+                    continue
+                logger.debug("%s   -->", thread.get_ident())
+                break
 
-                try:
-                    fsize = os.stat(file_name).st_size
+            while size > target and files_by_atime:
+                for file_name in files_by_atime.pop(files_by_atime.minKey()):
+                    file_name = os.path.join(blob_dir, file_name)
+                    lockfilename = os.path.join(os.path.dirname(file_name),
+                                                '.lock')
                     try:
-                        ZODB.blob.remove_committed(file_name)
-                    except OSError, v:
-                        pass # probably open on windows
-                    else:
-                        size -= fsize
-                finally:
-                    lock.close()
+                        lock = zc.lockfile.LockFile(lockfilename)
+                    except zc.lockfile.LockError:
+                        logger.debug("%s Skipping locked %s",
+                                     thread.get_ident(),
+                                     os.path.basename(file_name))
+                        continue  # In use, skip
 
-        logger.info("reduced blob cache size: %s", size)
+                    try:
+                        fsize = os.stat(file_name).st_size
+                        try:
+                            ZODB.blob.remove_committed(file_name)
+                        except OSError, v:
+                            pass # probably open on windows
+                        else:
+                            size -= fsize
+                    finally:
+                        lock.close()
 
+                    if size <= target:
+                        break
+
+            logger.debug("%s   reduced blob cache size: %s",
+                         thread.get_ident(), size)
+
     finally:
         check_lock.close()
 

Modified: ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test	2009-06-05 22:53:38 UTC (rev 100660)
+++ ZODB/trunk/src/ZEO/tests/zeo_blob_cache.test	2009-06-05 22:54:00 UTC (rev 100661)
@@ -33,6 +33,11 @@
 blob_cache_size_check option defaults to 100. We passed 10, to check
 after writing 10% of the target size.
 
+.. We're going to wait for any threads we started to finish, so...
+
+   >>> import threading
+   >>> old_threads = list(threading.enumerate())
+
 We want to check for name collections in the blob cache dir. We'll try
 to provoke name collections by reducing the number of cache directory
 subdirectories.
@@ -66,12 +71,15 @@
     ...                      if os.path.exists(os.path.join(base, f)):
     ...                          raise
     ...     return size
-    
-    >>> db.storage._check_blob_size_thread.join()
 
-    >>> cache_size('blobs') < 5000
-    True
+    >>> def check():
+    ...     return cache_size('blobs') < 5000
+    >>> def onfail():
+    ...     return cache_size('blobs')
 
+    >>> from ZEO.tests.forker import wait_until
+    >>> wait_until("size is reduced", check, 99, onfail)
+
 If we read all of the blobs, data will be downloaded again, as
 necessary, but the cache size will remain not much bigger than the
 target:
@@ -81,38 +89,27 @@
     ...     if data != chr(i)*100:
     ...         print 'bad data', `chr(i)`, `data`
 
-    >>> db.storage._check_blob_size_thread.join()
+    >>> wait_until("size is reduced", check, 99, onfail)
 
-    >>> cache_size('blobs') < 5000
-    True
-
     >>> for i in range(1, 101):
     ...     data = conn.root()[i].open().read()
     ...     if data != chr(i)*100:
     ...         print 'bad data', `chr(i)`, `data`
 
-    >>> db.storage._check_blob_size_thread.join()
-
     >>> for i in range(1, 101):
     ...     data = conn.root()[i].open('c').read()
     ...     if data != chr(i)*100:
     ...         print 'bad data', `chr(i)`, `data`
 
-    >>> db.storage._check_blob_size_thread.join()
+    >>> wait_until("size is reduced", check, 99, onfail)
 
-    >>> cache_size('blobs') < 5000
-    True
-
     >>> for i in range(1, 101):
     ...     data = open(conn.root()[i].committed(), 'rb').read()
     ...     if data != chr(i)*100:
     ...         print 'bad data', `chr(i)`, `data`
 
-    >>> db.storage._check_blob_size_thread.join()
+    >>> wait_until("size is reduced", check, 99, onfail)
 
-    >>> cache_size('blobs') < 5000
-    True
-
 Now let see if we can stress things a bit.  We'll create many clients
 and get them to pound on the blobs all at once to see if we can
 provoke problems:
@@ -131,7 +128,6 @@
     ...         data = conn.root()[i].open('c').read()
     ...         if data != chr(i)*100:
     ...             print 'bad data', `chr(i)`, `data`
-    ...     db._storage._check_blob_size_thread.join()
     ...     db.close()
 
     >>> threads = [threading.Thread(target=run) for i in range(10)]
@@ -140,12 +136,18 @@
     >>> for thread in threads:
     ...     thread.start()
     >>> for thread in threads:
-    ...     thread.join()
+    ...     thread.join(99)
+    ...     if thread.isAlive():
+    ...        print "Can't join thread."
 
-    >>> cache_size('blobs') < 5000
-    True
+    >>> wait_until("size is reduced", check, 99, onfail)
 
 .. cleanup
 
+    >>> for thread in threading.enumerate():
+    ...     if thread not in old_threads:
+    ...        thread.join(33)
+
     >>> db.close()
     >>> ZEO.ClientStorage.BlobCacheLayout.size = orig_blob_cache_layout_size
+



More information about the Zodb-checkins mailing list