[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/blob.py Reverted accidental checkin.

Jim Fulton jim at zope.com
Mon Jun 4 12:03:23 EDT 2007


Log message for revision 76314:
  Reverted accidental checkin.
  

Changed:
  U   ZODB/trunk/src/ZODB/blob.py

-=-
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py	2007-06-04 15:53:36 UTC (rev 76313)
+++ ZODB/trunk/src/ZODB/blob.py	2007-06-04 16:03:22 UTC (rev 76314)
@@ -16,14 +16,12 @@
 
 import base64
 import logging
-import logging
 import os
 import shutil
 import sys
+import time
 import tempfile
-import threading
-import time
-import weakref
+import logging
 
 import zope.interface
 
@@ -44,114 +42,78 @@
 
 valid_modes = 'r', 'w', 'r+', 'a'
 
-# Threading issues:
-# We want to support closing blob files when they are destroyed.
-# This introduces a threading issue, since a blob file may be destroyed
-# via GC in any thread.
-
-
 class Blob(persistent.Persistent):
     """A BLOB supports efficient handling of large data within ZODB."""
 
     zope.interface.implements(ZODB.interfaces.IBlob)
 
+    _os_link = os.rename
+
+    _p_blob_readers = 0
+    _p_blob_writers = 0
     _p_blob_uncommitted = None  # Filename of the uncommitted (dirty) data
-    _p_blob_committed = None    # Filename of the committed data
+    _p_blob_data = None         # Filename of the committed data
 
-    def __setstate__(self, state=None):
-        # We use lists here because it will allow is to add and remove
-        # atomically
-        self.readers = []
-        self.writers = []
-        
-    __init__ = __setstate__
+    # All persistent object store a reference to their data manager, a database
+    # connection in the _p_jar attribute. So we are going to do the same with
+    # blobs here.
+    _p_blob_manager = None
 
-    def __getstate__(self):
-        return None
+    # Blobs need to participate in transactions even when not connected to
+    # a database yet. If you want to use a non-default transaction manager,
+    # you can override it via _p_blob_transaction. This is currently
+    # required for unit testing.
+    _p_blob_transaction = None
 
-    def _p_deactivate(self):
-        # Only ghostify if we are unopened.
-        if self.readers or self.writers:
-            return
-        super(Blob, self)._p_deactivate()
-
-    def _p_invalidate(self):
-        # Force-close any open readers or writers,
-        # XXX should we warn of this? Maybe?
-        for ref in self.readers+self.writers:
-            f = ref()
-            if f is not None:
-                f.close()
-        super(Blob, self)._p_invalidate()
-
-    @property
-    def opened(self):
-        return bool(self.readers or self.writers)
-
-    def closed(self, f):
-        
-        # We use try/except below because another thread might remove
-        # the ref after we check it if the file is GCed.
-
-        for file_refs in (self.readers, self.writers):
-            for ref in self.file_refs:
-                if ref() is f:
-                    try:
-                        file_refs.remove(ref)
-                    except ValueError:
-                        pass
-                    return
-
     def open(self, mode="r"):
+        """Returns a file(-like) object representing blob data."""
+        result = None
+            
         if mode not in valid_modes:
             raise ValueError("invalid mode", mode)
 
-        if self.writers:
-            raise BlobError("Already opened for writing.")
-
         if mode == 'r':
             if self._current_filename() is None:
-                self._create_uncommitted_file()
+                raise BlobError("Blob does not exist.")
 
+            if self._p_blob_writers != 0:
+                raise BlobError("Already opened for writing.")
+
+            self._p_blob_readers += 1
             result = BlobFile(self._current_filename(), mode, self)
 
-            def destroyed(ref, readers=self.readers):
-                try:
-                    readers.remove(ref)
-                except ValueError:
-                    pass
-            
-            self.readers.append(weakref.ref(result, destroyed))
-        else:
-            if self._p_blob_readers:
+        elif mode == 'w':
+            if self._p_blob_readers != 0:
                 raise BlobError("Already opened for reading.")
 
-            if mode == 'w':
-                if self._p_blob_uncommitted is None:
-                    self._create_uncommitted_file()
-                result = BlobFile(self._p_blob_uncommitted, mode, self)
+            self._p_blob_writers += 1
+            if self._p_blob_uncommitted is None:
+                self._create_uncommitted_file()
+            result = BlobFile(self._p_blob_uncommitted, mode, self)
+
+        elif mode in ('a', 'r+'):
+            if self._p_blob_readers != 0:
+                raise BlobError("Already opened for reading.")
+
+            if self._p_blob_uncommitted is None:
+                # Create a new working copy
+                uncommitted = BlobFile(self._create_uncommitted_file(),
+                                       mode, self)
+                # NOTE: _p_blob data appears by virtue of Connection._setstate
+                utils.cp(file(self._p_blob_data), uncommitted)
+                uncommitted.seek(0)
             else:
-                if self._p_blob_uncommitted is None:
-                    # Create a new working copy
-                    self._create_uncommitted_file()
-                    result = BlobFile(self._p_blob_uncommitted, mode, self)
-                    utils.cp(file(self._p_blob_committed), result)
-                    if mode == 'r+':
-                        result.seek(0)
-                else:
-                    # Re-use existing working copy
-                    result = BlobFile(self._p_blob_uncommitted, mode, self)
+                # Re-use existing working copy
+                uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
 
-            def destroyed(ref, writers=self.writers):
-                try:
-                    writers.remove(ref)
-                except ValueError:
-                    pass
-            
-            self.writers.append(weakref.ref(result, destroyed))
+            self._p_blob_writers += 1
+            result = uncommitted
 
-            self._p_changed = True
+        else:
+            raise IOError('invalid mode: %s ' % mode)
 
+        if result is not None:
+            self._setup_transaction_manager(result)
         return result
 
     def openDetached(self, class_=file):
@@ -189,7 +151,7 @@
             os.unlink(target)
 
         try:
-            os.rename(filename, target)
+            self._os_link(filename, target)
         except:
             # Recover from the failed consumption: First remove the file, it
             # might exist and mark the pointer to the uncommitted file.
@@ -213,14 +175,14 @@
 
             # We changed the blob state and have to make sure we join the
             # transaction.
-            self._p_changed = True
+            self._change()
 
     # utility methods
 
     def _current_filename(self):
-        # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
+        # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
         # Connection._setstate
-        return self._p_blob_uncommitted or self._p_blob_committed
+        return self._p_blob_uncommitted or self._p_blob_data
 
     def _create_uncommitted_file(self):
         assert self._p_blob_uncommitted is None, (
@@ -229,6 +191,148 @@
         self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
         return self._p_blob_uncommitted
 
+    def _change(self):
+        self._p_changed = 1
+
+    def _setup_transaction_manager(self, result):
+        # We join the transaction with our own data manager in order to be
+        # notified of commit/vote/abort events.  We do this because at
+        # transaction boundaries, we need to fix up _p_ reference counts
+        # that keep track of open readers and writers and close any
+        # writable filehandles we've opened.
+        if self._p_blob_manager is None:
+            # Blobs need to always participate in transactions.
+            if self._p_jar is not None:
+                # If we are connected to a database, then we use the
+                # transaction manager that belongs to this connection
+                tm = self._p_jar.transaction_manager
+            else:
+                # If we are not connected to a database, we check whether
+                # we have been given an explicit transaction manager
+                if self._p_blob_transaction:
+                    tm = self._p_blob_transaction
+                else:
+                    # Otherwise we use the default
+                    # transaction manager as an educated guess.
+                    tm = transaction.manager
+            # Create our datamanager and join he current transaction.
+            dm = BlobDataManager(self, result, tm)
+            tm.get().join(dm)
+        elif result:
+            # Each blob data manager should manage only the one blob
+            # assigned to it.  Assert that this is the case and it is the
+            # correct blob
+            assert self._p_blob_manager.blob is self
+            self._p_blob_manager.register_fh(result)
+
+    # utility methods which should not cause the object's state to be
+    # loaded if they are called while the object is a ghost.  Thus,
+    # they are named with the _p_ convention and only operate against
+    # other _p_ instance attributes. We conventionally name these methods
+    # and attributes with a _p_blob prefix.
+
+    def _p_blob_clear(self):
+        self._p_blob_readers = 0
+        self._p_blob_writers = 0
+
+    def _p_blob_decref(self, mode):
+        if mode == 'r':
+            self._p_blob_readers = max(0, self._p_blob_readers - 1)
+        else:
+            assert mode in valid_modes, "Invalid mode %r" % mode
+            self._p_blob_writers = max(0, self._p_blob_writers - 1)
+
+    def _p_blob_refcounts(self):
+        # used by unit tests
+        return self._p_blob_readers, self._p_blob_writers
+
+
+class BlobDataManager:
+    """Special data manager to handle transaction boundaries for blobs.
+
+    Blobs need some special care-taking on transaction boundaries. As
+
+    a) the ghost objects might get reused, the _p_reader and _p_writer
+       refcount attributes must be set to a consistent state
+    b) the file objects might get passed out of the thread/transaction
+       and must deny any relationship to the original blob.
+    c) writable blob filehandles must be closed at the end of a txn so
+       as to not allow reuse between two transactions.
+
+    """
+
+    zope.interface.implements(transaction.interfaces.IDataManager)
+
+    def __init__(self, blob, filehandle, tm):
+        self.blob = blob
+        self.transaction = tm.get()
+        # we keep a weakref to the file handle because we don't want to
+        # keep it alive if all other references to it die (e.g. in the
+        # case it's opened without assigning it to a name).
+        self.fhrefs = utils.WeakSet()
+        self.register_fh(filehandle)
+        self.sortkey = time.time()
+        self.prepared = False
+
+    # Blob specific methods
+
+    def register_fh(self, filehandle):
+        self.fhrefs.add(filehandle)
+
+    def _remove_uncommitted_data(self):
+        self.blob._p_blob_clear()
+        self.fhrefs.map(lambda fhref: fhref.close())
+        if (self.blob._p_blob_uncommitted is not None and
+            os.path.exists(self.blob._p_blob_uncommitted)):
+            os.unlink(self.blob._p_blob_uncommitted)
+            self.blob._p_blob_uncommitted = None
+
+    # IDataManager
+
+    def tpc_begin(self, transaction):
+        if self.prepared:
+            raise TypeError('Already prepared')
+        self._checkTransaction(transaction)
+        self.prepared = True
+        self.transaction = transaction
+        self.fhrefs.map(lambda fhref: fhref.close())
+
+    def commit(self, transaction):
+        if not self.prepared:
+            raise TypeError('Not prepared to commit')
+        self._checkTransaction(transaction)
+        self.transaction = None
+        self.prepared = False
+
+        self.blob._p_blob_clear() 
+
+    def abort(self, transaction):
+        self.tpc_abort(transaction)
+
+    def tpc_abort(self, transaction):
+        self._checkTransaction(transaction)
+        if self.transaction is not None:
+            self.transaction = None
+        self.prepared = False
+
+        self._remove_uncommitted_data()
+
+    def tpc_finish(self, transaction):
+        pass
+
+    def tpc_vote(self, transaction):
+        pass
+
+    def sortKey(self):
+        return self.sortkey
+
+    def _checkTransaction(self, transaction):
+        if (self.transaction is not None and
+            self.transaction is not transaction):
+            raise TypeError("Transaction missmatch",
+                            transaction, self.transaction)
+
+
 class BlobFile(file):
     """A BlobFile that holds a file handle to actual blob data.
 
@@ -245,11 +349,36 @@
     def __init__(self, name, mode, blob):
         super(BlobFile, self).__init__(name, mode+'b')
         self.blob = blob
-            
+        self.close_called = False
+
+    def write(self, data):
+        super(BlobFile, self).write(data)
+        self.blob._change()
+
+    def writelines(self, lines):
+        super(BlobFile, self).writelines(lines)
+        self.blob._change()
+
+    def truncate(self, size=0):
+        super(BlobFile, self).truncate(size)
+        self.blob._change()
+
     def close(self):
-        self.blob.closed(self)
-        file.close(self)
+        # we don't want to decref twice
+        if not self.close_called:
+            self.blob._p_blob_decref(self.mode[:-1])
+            self.close_called = True
+            super(BlobFile, self).close()
 
+    def __del__(self):
+        # XXX we need to ensure that the file is closed at object
+        # expiration or our blob's refcount won't be decremented.
+        # This probably needs some work; I don't know if the names
+        # 'BlobFile' or 'super' will be available at program exit, but
+        # we'll assume they will be for now in the name of not
+        # muddying the code needlessly.
+        self.close()
+
 _pid = str(os.getpid())
 
 def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):



More information about the Zodb-checkins mailing list