[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Made a number of blob changes:

Jim Fulton jim at zope.com
Wed Jun 6 12:25:13 EDT 2007


Log message for revision 76436:
  Made a number of blob changes:
  
  - Unwritten blobs can now be read, and are empty.
  
  - Blobs are considered modified when opened for writing.  This is a
     little bit more conservative than before but fixes a bug that a file
     opened with 'w' actually does modify the file and wasn't considered
     to be a change before.
  
  - Optimistic savepoints now work.
  
  - Fixed bug: could open multiple files for writing.
  
  - Fixed bug: aborting a transaction removed uncommitted data for
     uncommitted blobs.
  
  Todo: 
     Need to remove uncommitted data file if a blob is GCed even when a
     transaction isn't aborted or when it hasn't been added to anything.
  
  - No-longer close files on transaction boundaries.
  
  This allows us to get rid of the transaction-manager dance.
  

Changed:
  U   ZODB/trunk/src/ZODB/blob.py
  U   ZODB/trunk/src/ZODB/tests/blob_basic.txt
  U   ZODB/trunk/src/ZODB/tests/blob_transaction.txt

-=-
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py	2007-06-06 16:21:05 UTC (rev 76435)
+++ ZODB/trunk/src/ZODB/blob.py	2007-06-06 16:25:12 UTC (rev 76436)
@@ -16,12 +16,14 @@
 
 import base64
 import logging
+import logging
 import os
 import shutil
 import sys
-import time
 import tempfile
-import logging
+import threading
+import time
+import weakref
 
 import zope.interface
 
@@ -42,78 +44,123 @@
 
 valid_modes = 'r', 'w', 'r+', 'a'
 
+# Threading issues:
+# We want to support closing blob files when they are destroyed.
+# This introduces a threading issue, since a blob file may be destroyed
+# via GC in any thread.
+
+
 class Blob(persistent.Persistent):
     """A BLOB supports efficient handling of large data within ZODB."""
 
     zope.interface.implements(ZODB.interfaces.IBlob)
 
-    _os_link = os.rename
-
-    _p_blob_readers = 0
-    _p_blob_writers = 0
     _p_blob_uncommitted = None  # Filename of the uncommitted (dirty) data
-    _p_blob_data = None         # Filename of the committed data
+    _p_blob_committed = None    # Filename of the committed data
 
-    # All persistent object store a reference to their data manager, a database
-    # connection in the _p_jar attribute. So we are going to do the same with
-    # blobs here.
-    _p_blob_manager = None
+    readers = writers = None
+    def __setstate__(self, state=None):
+        # We use lists here because it will allow is to add and remove
+        # atomically
+        self.readers = []
+        self.writers = []
+        
+    __init__ = __setstate__
 
-    # Blobs need to participate in transactions even when not connected to
-    # a database yet. If you want to use a non-default transaction manager,
-    # you can override it via _p_blob_transaction. This is currently
-    # required for unit testing.
-    _p_blob_transaction = None
+    def __getstate__(self):
+        return None
 
-    def open(self, mode="r"):
-        """Returns a file(-like) object representing blob data."""
-        result = None
+    def _p_deactivate(self):
+        # Only ghostify if we are unopened.
+        if self.readers or self.writers:
+            return
+        super(Blob, self)._p_deactivate()
+
+    def _p_invalidate(self):
+        # Force-close any open readers or writers,
+        # XXX should we warn of this? Maybe?
+        if self._p_changed is None:
+            return
+        for ref in self.readers+self.writers:
+            f = ref()
+            if f is not None:
+                f.close()
+
+        if (self._p_blob_uncommitted
+            and os.path.exists(self._p_blob_uncommitted)
+            ):
+            os.remove(self._p_blob_uncommitted)
             
+        super(Blob, self)._p_invalidate()
+
+    @property
+    def opened(self):
+        return bool(self.readers or self.writers)
+
+    def closed(self, f):
+        
+        # We use try/except below because another thread might remove
+        # the ref after we check it if the file is GCed.
+
+        for file_refs in (self.readers, self.writers):
+            for ref in file_refs:
+                if ref() is f:
+                    try:
+                        file_refs.remove(ref)
+                    except ValueError:
+                        pass
+                    return
+
+    def open(self, mode="r"):
         if mode not in valid_modes:
             raise ValueError("invalid mode", mode)
 
+        if self.writers:
+            raise BlobError("Already opened for writing.")
+
         if mode == 'r':
             if self._current_filename() is None:
-                raise BlobError("Blob does not exist.")
+                self._create_uncommitted_file()
 
-            if self._p_blob_writers != 0:
-                raise BlobError("Already opened for writing.")
-
-            self._p_blob_readers += 1
             result = BlobFile(self._current_filename(), mode, self)
 
-        elif mode == 'w':
-            if self._p_blob_readers != 0:
+            def destroyed(ref, readers=self.readers):
+                try:
+                    readers.remove(ref)
+                except ValueError:
+                    pass
+            
+            self.readers.append(weakref.ref(result, destroyed))
+        else:
+            if self.readers:
                 raise BlobError("Already opened for reading.")
 
-            self._p_blob_writers += 1
-            if self._p_blob_uncommitted is None:
-                self._create_uncommitted_file()
-            result = BlobFile(self._p_blob_uncommitted, mode, self)
-
-        elif mode in ('a', 'r+'):
-            if self._p_blob_readers != 0:
-                raise BlobError("Already opened for reading.")
-
-            if self._p_blob_uncommitted is None:
-                # Create a new working copy
-                uncommitted = BlobFile(self._create_uncommitted_file(),
-                                       mode, self)
-                # NOTE: _p_blob data appears by virtue of Connection._setstate
-                utils.cp(file(self._p_blob_data), uncommitted)
-                uncommitted.seek(0)
+            if mode == 'w':
+                if self._p_blob_uncommitted is None:
+                    self._create_uncommitted_file()
+                result = BlobFile(self._p_blob_uncommitted, mode, self)
             else:
-                # Re-use existing working copy
-                uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
+                if self._p_blob_uncommitted is None:
+                    # Create a new working copy
+                    self._create_uncommitted_file()
+                    result = BlobFile(self._p_blob_uncommitted, mode, self)
+                    utils.cp(file(self._p_blob_committed), result)
+                    if mode == 'r+':
+                        result.seek(0)
+                else:
+                    # Re-use existing working copy
+                    result = BlobFile(self._p_blob_uncommitted, mode, self)
 
-            self._p_blob_writers += 1
-            result = uncommitted
+            def destroyed(ref, writers=self.writers):
+                try:
+                    writers.remove(ref)
+                except ValueError:
+                    pass
+            
+            self.writers.append(weakref.ref(result, destroyed))
 
-        else:
-            raise IOError('invalid mode: %s ' % mode)
+            self._p_changed = True
 
-        if result is not None:
-            self._setup_transaction_manager(result)
         return result
 
     def openDetached(self, class_=file):
@@ -123,7 +170,7 @@
         """
         if self._current_filename() is None:
             raise BlobError("Blob does not exist.")
-        if self._p_blob_writers != 0:
+        if self.writers:
             raise BlobError("Already opened for writing.")
         # XXX this should increase the reader number and have a test !?!
         return class_(self._current_filename(), "rb")
@@ -132,9 +179,9 @@
         """Will replace the current data of the blob with the file given under
         filename.
         """
-        if self._p_blob_writers != 0:
+        if self.writers:
             raise BlobError("Already opened for writing.")
-        if self._p_blob_readers != 0:
+        if self.readers:
             raise BlobError("Already opened for reading.")
 
         previous_uncommitted = bool(self._p_blob_uncommitted)
@@ -151,7 +198,7 @@
             os.unlink(target)
 
         try:
-            self._os_link(filename, target)
+            os.rename(filename, target)
         except:
             # Recover from the failed consumption: First remove the file, it
             # might exist and mark the pointer to the uncommitted file.
@@ -175,14 +222,14 @@
 
             # We changed the blob state and have to make sure we join the
             # transaction.
-            self._change()
+            self._p_changed = True
 
     # utility methods
 
     def _current_filename(self):
-        # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
+        # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
         # Connection._setstate
-        return self._p_blob_uncommitted or self._p_blob_data
+        return self._p_blob_uncommitted or self._p_blob_committed
 
     def _create_uncommitted_file(self):
         assert self._p_blob_uncommitted is None, (
@@ -191,148 +238,6 @@
         self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
         return self._p_blob_uncommitted
 
-    def _change(self):
-        self._p_changed = 1
-
-    def _setup_transaction_manager(self, result):
-        # We join the transaction with our own data manager in order to be
-        # notified of commit/vote/abort events.  We do this because at
-        # transaction boundaries, we need to fix up _p_ reference counts
-        # that keep track of open readers and writers and close any
-        # writable filehandles we've opened.
-        if self._p_blob_manager is None:
-            # Blobs need to always participate in transactions.
-            if self._p_jar is not None:
-                # If we are connected to a database, then we use the
-                # transaction manager that belongs to this connection
-                tm = self._p_jar.transaction_manager
-            else:
-                # If we are not connected to a database, we check whether
-                # we have been given an explicit transaction manager
-                if self._p_blob_transaction:
-                    tm = self._p_blob_transaction
-                else:
-                    # Otherwise we use the default
-                    # transaction manager as an educated guess.
-                    tm = transaction.manager
-            # Create our datamanager and join he current transaction.
-            dm = BlobDataManager(self, result, tm)
-            tm.get().join(dm)
-        elif result:
-            # Each blob data manager should manage only the one blob
-            # assigned to it.  Assert that this is the case and it is the
-            # correct blob
-            assert self._p_blob_manager.blob is self
-            self._p_blob_manager.register_fh(result)
-
-    # utility methods which should not cause the object's state to be
-    # loaded if they are called while the object is a ghost.  Thus,
-    # they are named with the _p_ convention and only operate against
-    # other _p_ instance attributes. We conventionally name these methods
-    # and attributes with a _p_blob prefix.
-
-    def _p_blob_clear(self):
-        self._p_blob_readers = 0
-        self._p_blob_writers = 0
-
-    def _p_blob_decref(self, mode):
-        if mode == 'r':
-            self._p_blob_readers = max(0, self._p_blob_readers - 1)
-        else:
-            assert mode in valid_modes, "Invalid mode %r" % mode
-            self._p_blob_writers = max(0, self._p_blob_writers - 1)
-
-    def _p_blob_refcounts(self):
-        # used by unit tests
-        return self._p_blob_readers, self._p_blob_writers
-
-
-class BlobDataManager:
-    """Special data manager to handle transaction boundaries for blobs.
-
-    Blobs need some special care-taking on transaction boundaries. As
-
-    a) the ghost objects might get reused, the _p_reader and _p_writer
-       refcount attributes must be set to a consistent state
-    b) the file objects might get passed out of the thread/transaction
-       and must deny any relationship to the original blob.
-    c) writable blob filehandles must be closed at the end of a txn so
-       as to not allow reuse between two transactions.
-
-    """
-
-    zope.interface.implements(transaction.interfaces.IDataManager)
-
-    def __init__(self, blob, filehandle, tm):
-        self.blob = blob
-        self.transaction = tm.get()
-        # we keep a weakref to the file handle because we don't want to
-        # keep it alive if all other references to it die (e.g. in the
-        # case it's opened without assigning it to a name).
-        self.fhrefs = utils.WeakSet()
-        self.register_fh(filehandle)
-        self.sortkey = time.time()
-        self.prepared = False
-
-    # Blob specific methods
-
-    def register_fh(self, filehandle):
-        self.fhrefs.add(filehandle)
-
-    def _remove_uncommitted_data(self):
-        self.blob._p_blob_clear()
-        self.fhrefs.map(lambda fhref: fhref.close())
-        if (self.blob._p_blob_uncommitted is not None and
-            os.path.exists(self.blob._p_blob_uncommitted)):
-            os.unlink(self.blob._p_blob_uncommitted)
-            self.blob._p_blob_uncommitted = None
-
-    # IDataManager
-
-    def tpc_begin(self, transaction):
-        if self.prepared:
-            raise TypeError('Already prepared')
-        self._checkTransaction(transaction)
-        self.prepared = True
-        self.transaction = transaction
-        self.fhrefs.map(lambda fhref: fhref.close())
-
-    def commit(self, transaction):
-        if not self.prepared:
-            raise TypeError('Not prepared to commit')
-        self._checkTransaction(transaction)
-        self.transaction = None
-        self.prepared = False
-
-        self.blob._p_blob_clear() 
-
-    def abort(self, transaction):
-        self.tpc_abort(transaction)
-
-    def tpc_abort(self, transaction):
-        self._checkTransaction(transaction)
-        if self.transaction is not None:
-            self.transaction = None
-        self.prepared = False
-
-        self._remove_uncommitted_data()
-
-    def tpc_finish(self, transaction):
-        pass
-
-    def tpc_vote(self, transaction):
-        pass
-
-    def sortKey(self):
-        return self.sortkey
-
-    def _checkTransaction(self, transaction):
-        if (self.transaction is not None and
-            self.transaction is not transaction):
-            raise TypeError("Transaction missmatch",
-                            transaction, self.transaction)
-
-
 class BlobFile(file):
     """A BlobFile that holds a file handle to actual blob data.
 
@@ -349,36 +254,11 @@
     def __init__(self, name, mode, blob):
         super(BlobFile, self).__init__(name, mode+'b')
         self.blob = blob
-        self.close_called = False
-
-    def write(self, data):
-        super(BlobFile, self).write(data)
-        self.blob._change()
-
-    def writelines(self, lines):
-        super(BlobFile, self).writelines(lines)
-        self.blob._change()
-
-    def truncate(self, size=0):
-        super(BlobFile, self).truncate(size)
-        self.blob._change()
-
+            
     def close(self):
-        # we don't want to decref twice
-        if not self.close_called:
-            self.blob._p_blob_decref(self.mode[:-1])
-            self.close_called = True
-            super(BlobFile, self).close()
+        self.blob.closed(self)
+        file.close(self)
 
-    def __del__(self):
-        # XXX we need to ensure that the file is closed at object
-        # expiration or our blob's refcount won't be decremented.
-        # This probably needs some work; I don't know if the names
-        # 'BlobFile' or 'super' will be available at program exit, but
-        # we'll assume they will be for now in the name of not
-        # muddying the code needlessly.
-        self.close()
-
 _pid = str(os.getpid())
 
 def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):

Modified: ZODB/trunk/src/ZODB/tests/blob_basic.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/blob_basic.txt	2007-06-06 16:21:05 UTC (rev 76435)
+++ ZODB/trunk/src/ZODB/tests/blob_basic.txt	2007-06-06 16:25:12 UTC (rev 76436)
@@ -26,12 +26,10 @@
     >>> IBlob.providedBy(myblob)
     True
 
-Opening a new Blob for reading fails:
+We can open a new blob file for reading, but it won't have any data:
 
-    >>> myblob.open("r")
-    Traceback (most recent call last):
-        ...
-    BlobError: Blob does not exist.
+    >>> myblob.open("r").read()
+    ''
 
 But we can write data to a new Blob by opening it for writing:
 

Modified: ZODB/trunk/src/ZODB/tests/blob_transaction.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/blob_transaction.txt	2007-06-06 16:21:05 UTC (rev 76435)
+++ ZODB/trunk/src/ZODB/tests/blob_transaction.txt	2007-06-06 16:25:12 UTC (rev 76436)
@@ -35,88 +35,98 @@
     >>> blob1 = Blob()
     >>> blob1.open('w').write('this is blob 1')
     >>> root1['blob1'] = blob1
-    >>> transaction.commit()
+    >>> 'blob1' in root1
+    True
+   
+Aborting a blob add leaves the blob unchanged:
 
-Aborting a transaction involving a blob write cleans up uncommitted
-file data::
+    >>> transaction.abort()
+    >>> 'blob1' in root1
+    False
 
-    >>> dead_blob = Blob()
-    >>> dead_blob.open('w').write('this is a dead blob')
-    >>> root1['dead_blob'] = dead_blob
-    >>> fname = dead_blob._p_blob_uncommitted
+    >>> blob1._p_oid
+    >>> blob1._p_jar
+    >>> blob1.open().read()
+    'this is blob 1'
+
+It doesn't clear the file because there is no previously committed version: 
+
+    >>> fname = blob1._p_blob_uncommitted
     >>> import os
     >>> os.path.exists(fname)
     True
+
+Let's put the blob back into the root and commit the change:
+
+    >>> root1['blob1'] = blob1
+    >>> transaction.commit()
+
+Now, if we make a change and abort it, we'll return to the committed
+state:
+
+    >>> os.path.exists(fname)
+    False
+    >>> blob1._p_blob_uncommitted
+
+    >>> blob1.open('w').write('this is new blob 1')
+    >>> blob1.open().read()
+    'this is new blob 1'
+    >>> fname = blob1._p_blob_uncommitted
+    >>> os.path.exists(fname)
+    True
+
     >>> transaction.abort()
     >>> os.path.exists(fname)
     False
+    >>> blob1._p_blob_uncommitted
 
+    >>> blob1.open().read()
+    'this is blob 1'
+
 Opening a blob gives us a filehandle.  Getting data out of the
 resulting filehandle is accomplished via the filehandle's read method::
 
     >>> connection2 = database.open()
     >>> root2 = connection2.root()
     >>> blob1a = root2['blob1']
-    >>> blob1a._p_blob_refcounts()
-    (0, 0)
-    >>>
+
     >>> blob1afh1 = blob1a.open("r")
     >>> blob1afh1.read()
     'this is blob 1'
-    >>> # The filehandle keeps a reference to its blob object
-    >>> blob1afh1.blob._p_blob_refcounts()
-    (1, 0)
 
-Let's make another filehandle for read only to blob1a, this should bump
-up its refcount by one, and each file handle has a reference to the
-(same) underlying blob::
+Let's make another filehandle for read only to blob1a. Aach file
+handle has a reference to the (same) underlying blob::
 
     >>> blob1afh2 = blob1a.open("r")
-    >>> blob1afh2.blob._p_blob_refcounts()
-    (2, 0)
-    >>> blob1afh1.blob._p_blob_refcounts()
-    (2, 0)
     >>> blob1afh2.blob is blob1afh1.blob
     True
 
-Let's close the first filehandle we got from the blob, this should decrease
-its refcount by one::
+Let's close the first filehandle we got from the blob::
 
     >>> blob1afh1.close()
-    >>> blob1a._p_blob_refcounts()
-    (1, 0)
 
 Let's abort this transaction, and ensure that the filehandles that we
-opened are now closed and that the filehandle refcounts on the blob
-object are cleared::
+opened are still open::
 
     >>> transaction.abort()
-    >>> blob1afh1.blob._p_blob_refcounts()
-    (0, 0)
-    >>> blob1afh2.blob._p_blob_refcounts()
-    (0, 0)
-    >>> blob1a._p_blob_refcounts()
-    (0, 0)
     >>> blob1afh2.read()
-    Traceback (most recent call last):
-        ...
-    ValueError: I/O operation on closed file
+    'this is blob 1'
 
-If we open a blob for append, its write refcount should be nonzero.
-Additionally, writing any number of bytes to the blobfile should
-result in the blob being marked "dirty" in the connection (we just
-aborted above, so the object should be "clean" when we start)::
+    >>> blob1afh2.close()
 
+If we open a blob for append, writing any number of bytes to the
+blobfile should result in the blob being marked "dirty" in the
+connection (we just aborted above, so the object should be "clean"
+when we start)::
+
     >>> bool(blob1a._p_changed)
     False
     >>> blob1a.open('r').read()
     'this is blob 1'
     >>> blob1afh3 = blob1a.open('a')
-    >>> blob1afh3.write('woot!')
-    >>> blob1a._p_blob_refcounts()
-    (0, 1)
     >>> bool(blob1a._p_changed)
     True
+    >>> blob1afh3.write('woot!')
 
 We can open more than one blob object during the course of a single
 transaction::
@@ -125,10 +135,6 @@
     >>> blob2.open('w').write('this is blob 3')
     >>> root2['blob2'] = blob2
     >>> transaction.commit()
-    >>> blob2._p_blob_refcounts()
-    (0, 0)
-    >>> blob1._p_blob_refcounts()
-    (0, 0)
 
 Since we committed the current transaction above, the aggregate
 changes we've made to blob, blob1a (these refer to the same object) and
@@ -200,7 +206,7 @@
 Savepoints and Blobs
 --------------------
 
-We do support optimistic savepoints ::
+We do support optimistic savepoints:
 
     >>> connection5 = database.open()
     >>> root5 = connection5.root()
@@ -222,17 +228,16 @@
     "I'm a happy blob. And I'm singing."
     >>> transaction.get().commit()
 
-We do not support non-optimistic savepoints::
+We support optimistic savepoints too:
 
-    >>> blob_fh = root5['blob'].open("a")
-    >>> blob_fh.write(" And the weather is beautiful.")
-    >>> blob_fh.close()
+    >>> root5['blob'].open("a").write(" And I'm dancing.")
     >>> root5['blob'].open("r").read()
-    "I'm a happy blob. And I'm singing. And the weather is beautiful."
-    >>> savepoint = transaction.savepoint()             # doctest: +ELLIPSIS
-    Traceback (most recent call last):
-        ...
-    TypeError: ('Savepoints unsupported', <ZODB.blob.BlobDataManager instance at 0x...>)
+    "I'm a happy blob. And I'm singing. And I'm dancing."
+    >>> savepoint = transaction.savepoint()
+    >>> root5['blob'].open("w").write(" And the weather is beautiful.")
+    >>> savepoint.rollback()
+    >>> root5['blob'].open("r").read()
+    "I'm a happy blob. And I'm singing. And I'm dancing."
     >>> transaction.abort()
 
 Reading Blobs outside of a transaction



More information about the Zodb-checkins mailing list