[Zodb-checkins] SVN: ZODB/branches/blob-merge-branch/src/Z Factor out blob cache storage into a helper class for use by both ClientStorage and BlobStorage.

Chris McDonough chrism at plope.com
Mon Feb 27 14:11:08 EST 2006


Log message for revision 65531:
  Factor out blob cache storage into a helper class for use by both ClientStorage and BlobStorage.
  

Changed:
  U   ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
  U   ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py
  U   ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py
  U   ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py
  U   ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt

-=-
Modified: ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py	2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py	2006-02-27 19:11:08 UTC (rev 65531)
@@ -35,11 +35,11 @@
 from ZEO.auth import get_module
 from ZEO.zrpc.client import ConnectionManager
 
-from ZODB.Blobs.BlobStorage import BLOB_SUFFIX 
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
 from ZODB.Blobs.interfaces import IBlobStorage
+from ZODB.Blobs.Blob import FilesystemHelper
 from persistent.TimeStamp import TimeStamp
 
 logger = logging.getLogger('ZEO.ClientStorage')
@@ -316,17 +316,13 @@
 
         # XXX need to check for POSIX-ness here
         if blob_dir is not None:
-            if not os.path.exists(blob_dir):
-                os.makedirs(blob_dir, 0700)
-                log2("Blob cache directory '%s' does not exist. "
-                            "Created new directory." % self.base_directory,
-                            level=logging.INFO)
-            if (os.stat(blob_dir).st_mode & 077) != 0:
+            self.fshelper = FilesystemHelper(blob_dir)
+            self.fshelper.create()
+            if not self.fshelper.isSecure(blob_dir):
                 log2('Blob dir %s has insecure mode setting' % blob_dir,
                      level=logging.WARNING)
-
-        self.blob_dir = blob_dir
-
+        else:
+            self.fshelper = None
         # Initialize locks
         self.blob_status_lock = threading.Lock()
         self.blob_status = {}
@@ -929,37 +925,21 @@
         os.unlink(blobfilename)
         return serials
 
-    def _getBlobPath(self, oid):
-        return os.path.join(self.blob_dir,
-                            utils.oid_repr(oid)
-                            )
-
-    def _getLoadingFilename(self, oid, serial):
-        """Generate an intermediate filename for two-phase commit.
-        """
-        return self._getCleanFilename(oid, serial) + ".loading" 
-
-    def _getCleanFilename(self, oid, tid):
-        return os.path.join(self._getBlobPath(oid),
-                            "%s%s" % (utils.tid_repr(tid), 
-                                         BLOB_SUFFIX,)
-                            )
-
     def _do_load_blob(self, oid, serial, version):
         """Do the actual loading from the RPC server."""
-        blob_filename = self._getCleanFilename(oid, serial)
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
         if self._server is None:
             raise ClientDisconnected()
 
-        targetpath = self._getBlobPath(oid)
+        targetpath = self.fshelper.getPathForOID(oid)
         if not os.path.exists(targetpath):
             os.makedirs(targetpath, 0700)
 
         # We write to a temporary file first, so we do not accidentally 
         # allow half-baked copies of this blob be loaded
-        tempfilename = self._getLoadingFilename(oid, serial)
-        tempfile = open(tempfilename, "wb")
-        
+        tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
+        tempfile = fdopen(tempfd, 'wb')
+
         offset = 0
         while True:
             chunk = self._server.loadBlob(oid, serial, version, offset)
@@ -982,11 +962,11 @@
             2a. Wait for other download to finish, return 
             3. If not beeing downloaded, start download
         """
-        if self.blob_dir is None:
+        if self.fshelper is None:
             raise POSException.Unsupported("No blob cache directory is "
                                            "configured.")
 
-        blob_filename = self._getCleanFilename(oid, serial)
+        blob_filename = self.fshelper.getBlobFilename(oid, serial)
         # Case 1: Blob is available already, just use it
         if os.path.exists(blob_filename):
             return blob_filename

Modified: ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py	2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZEO/tests/testZEO.py	2006-02-27 19:11:08 UTC (rev 65531)
@@ -308,7 +308,7 @@
                 super(statusdict, self).__delitem__(k)
 
         # ensure that we do locking properly
-        filename = self._storage._getCleanFilename(oid, serial)
+        filename = self._storage.fshelper.getBlobFilename(oid, serial)
         thestatuslock = self._storage.blob_status_lock = Dummy()
         thebloblock = Dummy()
 

Modified: ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py	2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZODB/Blobs/Blob.py	2006-02-27 19:11:08 UTC (rev 65531)
@@ -2,6 +2,7 @@
 import os
 import time
 import tempfile
+import logging
 
 from zope.interface import implements
 
@@ -12,6 +13,8 @@
 from transaction.interfaces import IDataManager
 from persistent import Persistent
 
+BLOB_SUFFIX = ".blob"
+
 class Blob(Persistent):
  
     implements(IBlob)
@@ -265,3 +268,87 @@
         # we'll assume they will be for now in the name of not
         # muddying the code needlessly.
         self.close()
+
+logger = logging.getLogger('ZODB.Blobs')
+_pid = str(os.getpid())
+
+def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
+    message = "(%s) %s" % (subsys, msg)
+    logger.log(level, message, exc_info=exc_info)
+
+class FilesystemHelper:
+
+    # Storages that implement IBlobStorage can choose to use this
+    # helper class to generate and parse blob filenames.  This is not
+    # a set-in-stone interface for all filesystem operations dealing
+    # with blobs and storages needn't indirect through this if they
+    # want to perform blob storage differently.
+
+    def __init__(self, base_dir):
+        self.base_dir = base_dir
+
+    def create(self):
+        if not os.path.exists(self.base_dir):
+            os.makedirs(self.base_dir, 0700)
+            log("Blob cache directory '%s' does not exist. "
+                "Created new directory." % self.base_dir,
+                level=logging.INFO)
+
+    def isSecure(self, path):
+        """ Ensure that (POSIX) path mode bits are 0700 """
+        return (os.stat(path).st_mode & 077) != 0
+
+    def getPathForOID(self, oid):
+        """ Given an OID, return the path on the filesystem where
+        the blob data relating to that OID is stored """
+        return os.path.join(self.base_dir, utils.oid_repr(oid))
+
+    def getBlobFilename(self, oid, tid):
+        """ Given an oid and a tid, return the full filename of the
+        'committed' blob file related to that oid and tid. """
+        oid_path = self.getPathForOID(oid)
+        filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
+        return os.path.join(oid_path, filename)
+
+    def blob_mkstemp(self, oid, tid):
+        """ Given an oid and a tid, return a temporary file descriptor
+        and a related filename.  The file is guaranteed to exist on
+        the same partition as committed data, which is important for
+        being able to rename the file without a copy operation.  The
+        directory in which the file will be placed, which is the
+        return value of self.getPathForOID(oid), must exist before
+        this method may be called successfully."""
+        oidpath = self.getPathForOID(oid)
+        fd, name = tempfile.mkstemp(suffix='.tmp', prefix=utils.tid_repr(tid),
+                                    dir=oidpath)
+        return fd, name
+
+    def splitBlobFilename(self, filename):
+        """Returns the oid and tid for a given blob filename.
+
+        If the filename cannot be recognized as a blob filename, (None, None)
+        is returned.
+        """
+        if not filename.endswith(BLOB_SUFFIX):
+            return None, None
+        path, filename = os.path.split(filename)
+        oid = os.path.split(path)[1]
+
+        serial = filename[:-len(BLOB_SUFFIX)]
+        oid = utils.repr_to_oid(oid)
+        serial = utils.repr_to_oid(serial)
+        return oid, serial 
+
+    def getOIDsForSerial(self, search_serial):
+        """ Return all oids related to a particular tid that exist in
+        blob data """
+        oids = []
+        base_dir = self.base_dir
+        for oidpath in os.listdir(base_dir):
+            for filename in os.listdir(os.path.join(base_dir, oidpath)):
+                blob_path = os.path.join(base_dir, oidpath, filename)
+                oid, serial = self.splitBlobFilename(blob_path)
+                if search_serial == serial:
+                    oids.append(oid)
+        return oids
+        

Modified: ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py	2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZODB/Blobs/BlobStorage.py	2006-02-27 19:11:08 UTC (rev 65531)
@@ -23,9 +23,9 @@
 from ZODB import utils
 from ZODB.Blobs.interfaces import IBlobStorage, IBlob
 from ZODB.POSException import POSKeyError
+from ZODB.Blobs.Blob import BLOB_SUFFIX
+from ZODB.Blobs.Blob import FilesystemHelper
 
-BLOB_SUFFIX = ".blob"
-
 logger = logging.getLogger('ZODB.BlobStorage')
 
 class BlobStorage(ProxyBase):
@@ -33,7 +33,7 @@
 
     implements(IBlobStorage)
 
-    __slots__ = ('base_directory', 'dirty_oids')
+    __slots__ = ('fshelper', 'dirty_oids')
     # Proxies can't have a __dict__ so specifying __slots__ here allows
     # us to have instance attributes explicitly on the proxy.
 
@@ -43,7 +43,7 @@
     def __init__(self, base_directory, storage):    
         # TODO Complain if storage is ClientStorage
         ProxyBase.__init__(self, storage)
-        self.base_directory = base_directory
+        self.fshelper = FilesystemHelper(base_directory)
         if not os.path.exists(self.base_directory):
             os.makedirs(self.base_directory, 0700)
             logger.info("Blob directory '%s' does not exist. "
@@ -64,11 +64,11 @@
 
         self._lock_acquire()
         try:
-            targetpath = self._getBlobPath(oid)
+            targetpath = self.fshelper.getPathForOID(oid)
             if not os.path.exists(targetpath):
                 os.makedirs(targetpath, 0700)
                               
-            targetname = self._getCleanFilename(oid, serial)
+            targetname = self.fshelper.getBlobFilename(oid, serial)
             os.rename(blobfilename, targetname)
 
             # XXX if oid already in there, something is really hosed.
@@ -78,17 +78,6 @@
             self._lock_release()
         return self._tid
 
-    def _getBlobPath(self, oid):
-        return os.path.join(self.base_directory,
-                            utils.oid_repr(oid)
-                            )
-
-    def _getCleanFilename(self, oid, tid):
-        return os.path.join(self._getBlobPath(oid),
-                            "%s%s" % (utils.tid_repr(tid), 
-                                      BLOB_SUFFIX,)
-                            )
-
     def tpc_finish(self, *arg, **kw):
         """ We need to override the base storage's tpc_finish instead of
         providing a _finish method because methods found on the proxied object
@@ -103,14 +92,14 @@
         getProxiedObject(self).tpc_abort(*arg, **kw)
         while self.dirty_oids:
             oid, serial = self.dirty_oids.pop()
-            clean = self._getCleanFilename(oid, serial)
+            clean = self.fshelper.getBlobFilename(oid, serial)
             if os.exists(clean):
                 os.unlink(clean) 
 
     def loadBlob(self, oid, serial, version):
         """Return the filename where the blob file can be found.
         """
-        filename = self._getCleanFilename(oid, serial)
+        filename = self.fshelper.getBlobFilename(oid, serial)
         if not os.path.exists(filename):
             raise POSKeyError, "Not an existing blob."
         return filename
@@ -125,17 +114,18 @@
         # XXX we should be tolerant of "garbage" directories/files in
         # the base_directory here.
 
-        for oid_repr in os.listdir(self.base_directory):
+        base_dir = self.fshelper.base_dir
+        for oid_repr in os.listdir(base_dir):
             oid = utils.repr_to_oid(oid_repr)
-            oid_path = os.path.join(self.base_directory, oid_repr)
+            oid_path = os.path.join(base_dir, oid_repr)
             files = os.listdir(oid_path)
             files.sort()
 
             for filename in files:
                 filepath = os.path.join(oid_path, filename)
-                whatever, serial = self._splitBlobFilename(filepath)
+                whatever, serial = self.fshelper.splitBlobFilename(filepath)
                 try:
-                    fn = self._getCleanFilename(oid, serial)
+                    fn = self.fshelper.getBlobFilename(oid, serial)
                     self.loadSerial(oid, serial)
                 except POSKeyError:
                     os.unlink(filepath)
@@ -144,9 +134,10 @@
                 shutil.rmtree(oid_path)
 
     def _packNonUndoing(self, packtime, referencesf):
-        for oid_repr in os.listdir(self.base_directory):
+        base_dir = self.fshelper.base_dir
+        for oid_repr in os.listdir(base_dir):
             oid = utils.repr_to_oid(oid_repr)
-            oid_path = os.path.join(self.base_directory, oid_repr)
+            oid_path = os.path.join(base_dir, oid_repr)
             exists = True
 
             try:
@@ -193,41 +184,29 @@
         orig_size = getProxiedObject(self).getSize()
         
         blob_size = 0
-        for oid in os.listdir(self.base_directory):
-            for serial in os.listdir(os.path.join(self.base_directory, oid)):
+        base_dir = self.fshelper.base_dir
+        for oid in os.listdir(base_dir):
+            for serial in os.listdir(os.path.join(base_dir, oid)):
                 if not serial.endswith(BLOB_SUFFIX):
                     continue
-                file_path = os.path.join(self.base_directory, oid, serial)
+                file_path = os.path.join(base_dir, oid, serial)
                 blob_size += os.stat(file_path).st_size
         
         return orig_size + blob_size
 
-    def _splitBlobFilename(self, filename):
-        """Returns OID, TID for a given blob filename.
-
-        If it's not a blob filename, (None, None) is returned.
-        """
-        if not filename.endswith(BLOB_SUFFIX):
-            return None, None
-        path, filename = os.path.split(filename)
-        oid = os.path.split(path)[1]
-
-        serial = filename[:-len(BLOB_SUFFIX)]
-        oid = utils.repr_to_oid(oid)
-        serial = utils.repr_to_oid(serial)
-        return oid, serial 
-
     def undo(self, serial_id, transaction):
         serial, keys = getProxiedObject(self).undo(serial_id, transaction)
         self._lock_acquire()
         try:
             # The old serial_id is given in base64 encoding ...
             serial_id = base64.decodestring(serial_id+ '\n')
-            for oid in self._getOIDsForSerial(serial_id):
-                data, serial_before, serial_after = \
-                        self.loadBefore(oid, serial_id) 
-                orig = file(self._getCleanFilename(oid, serial_before), "r")
-                new = file(self._getCleanFilename(oid, serial), "w")
+            for oid in self.fshelper.getOIDsForSerial(serial_id):
+                data, serial_before, serial_after = self.loadBefore(oid,
+                                                                    serial_id) 
+                orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
+                orig = open(orig_fn, "r")
+                new_fn = self.fshelper.getBlobFilename(oid, serial)
+                new = open(new_fn, "wb")
                 utils.cp(orig, new)
                 orig.close()
                 new.close()
@@ -236,14 +215,3 @@
             self._lock_release()
         return serial, keys
 
-    def _getOIDsForSerial(self, search_serial):
-        oids = []
-        for oidpath in os.listdir(self.base_directory):
-            for filename in os.listdir(os.path.join(self.base_directory,
-                                     oidpath)):
-                blob_path = os.path.join(self.base_directory, oidpath, 
-                                         filename)
-                oid, serial = self._splitBlobFilename(blob_path)
-                if search_serial == serial:
-                    oids.append(oid)
-        return oids

Modified: ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt	2006-02-27 19:09:29 UTC (rev 65530)
+++ ZODB/branches/blob-merge-branch/src/ZODB/Blobs/tests/packing.txt	2006-02-27 19:11:08 UTC (rev 65531)
@@ -83,13 +83,13 @@
     >>> tids.append(blob_storage._tid)
 
     >>> oid = root['blob']._p_oid
-    >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+    >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
     >>> [ os.path.exists(x) for x in fns ]
     [True, True, True, True, True]
 
 Get our blob filenames for this oid.
 
-    >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+    >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
 
 Do a pack to the slightly before the first revision was written:
 
@@ -203,13 +203,13 @@
     >>> tids.append(blob_storage._tid)
 
     >>> oid = root['blob']._p_oid
-    >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+    >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
     >>> [ os.path.exists(x) for x in fns ]
     [True, True, True, True, True]
 
 Get our blob filenames for this oid.
 
-    >>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
+    >>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
 
 Do a pack to the slightly before the first revision was written:
 



More information about the Zodb-checkins mailing list