[Zope-Checkins] CVS: Zope3/lib/python/Transaction - _defaultTransaction.py:1.3

Jeremy Hylton jeremy@zope.com
Fri, 21 Jun 2002 16:35:04 -0400


Update of /cvs-repository/Zope3/lib/python/Transaction
In directory cvs.zope.org:/tmp/cvs-serv32553

Modified Files:
	_defaultTransaction.py 
Log Message:
Break up commit() into many small pieces.



=== Zope3/lib/python/Transaction/_defaultTransaction.py 1.2 => 1.3 ===
 hosed = 0
 
+already_hosed_text =  \
+           """A serious error, which was probably a system error,
+           occurred in a previous database transaction.  This
+           application may be in an invalid state and must be
+           restarted before database updates can be allowed.
+           
+           Beware though that if the error was due to a serious system
+           problem, such as a disk full condition, then the
+           application may not come up until you deal with the system
+           problem.  See your application log for information on the
+           error that lead to this problem.
+           """
+
+hosed_text = \
+           """A storage error occurred in the last phase of a
+           two-phase commit.  This shouldn't happen.  The application
+           may be in a hosed state, so transactions will not be
+           allowed to commit until the site/storage is reset by a
+           restart.  """
+           
+
 class Transaction:
     """Simple transaction objects for single-threaded applications."""
     
@@ -141,181 +162,196 @@
     def commit(self, subtransaction=None):
         """Finalize the transaction."""
 
+        # This method does all the setup, then calls _commit().
+
         global hosed
-        
-        objects=self._objects
-        jars={}
+
+        # Make local reference to objects, which will be changed to
+        # new object if there is an implicit subtransaction commit
+        objects = self._objects
+        jars = {}
         jarsv = None
-        subj=self._sub
-        subjars=()
+        subjars = ()
 
         if subtransaction:
-            if subj is None: self._sub=subj={}
+            if self._sub is None:
+                self._sub = {}
         else:
-            if subj is not None:
+            if self._sub is not None:
                 if objects:
                     # Do an implicit sub-transaction commit:
                     self.commit(1)
-                    objects=[]
-                subjars=subj.values()
-                self._sub=None
+                    objects = []
+                subjars = self._sub.values()
+                self._sub = None
 
         # If not a subtransaction, then we need to add any non-
         # subtransaction-supporting objects that may have been
         # stowed away during subtransaction commits to _objects.
         if (subtransaction is None) and (self._non_st_objects is not None):
-            append=objects.append
-            for object in self._non_st_objects:
-                append(object)
+            objects.extend(self._non_st_objects)
             self._non_st_objects = None
 
-        t=v=tb=None
+        t = v = tb = None
 
         if (objects or subjars) and hosed:
             # Something really bad happened and we don't
             # trust the system state.
-            raise TransactionError, (
-                
-                """A serious error, which was probably a system error,
-                occurred in a previous database transaction.  This
-                application may be in an invalid state and must be
-                restarted before database updates can be allowed.
-
-                Beware though that if the error was due to a serious
-                system problem, such as a disk full condition, then
-                the application may not come up until you deal with
-                the system problem.  See your application log for
-                information on the error that lead to this problem.
-                """)
+            raise TransactionError(already_hosed_text)
 
         try:
+            self._commit(objects, jars, subtransaction, subjars)
+        finally:
+            tb = None
+            del objects[:] # clear registered
+            if not subtransaction and self._id is not None:
+                free_transaction()
 
-            # It's important that:
-            #
-            # - Every object in self._objects is either committed
-            #   or aborted.
-            #
-            # - For each object that is committed
-            #   we call tpc_begin on it's jar at least once
-            #
-            # - For every jar for which we've called tpc_begin on,
-            #   we either call tpc_abort or tpc_finish. It is OK
-            #   to call these multiple times, as the storage is
-            #   required to ignore these calls if tpc_begin has not
-            #   been called.
-            
-            ncommitted=0
-            try:
-                for o in objects:
-                    j=getattr(o, '_p_jar', o)
-                    if j is not None:
-                        i=id(j)
-                        if not (i in jars):
-                            jars[i]=j
-                            if subtransaction:
-
-                                # If a jar does not support subtransactions,
-                                # we need to save it away to be committed in
-                                # the outer transaction.
-                                try:
-                                    j.tpc_begin(self, subtransaction)
-                                except TypeError:
-                                    j.tpc_begin(self)
-
-                                if hasattr(j, 'commit_sub'):
-                                    subj[i] = j
-                                else:
-                                    if self._non_st_objects is None:
-                                        self._non_st_objects = []
-                                    self._non_st_objects.append(o)
-                                    continue
-
-                            else:
-                                j.tpc_begin(self)
-                        j.commit(o, self)
-                    ncommitted = ncommitted+1
-
-                # Commit work done in subtransactions
-                while subjars:
-                    j=subjars.pop()
-                    i=id(j)
-                    if not (i in jars):
-                        jars[i]=j
-                    j.commit_sub(self)
-
-                jarsv = jars.values()
-                for jar in jarsv:
-                    if not subtransaction:
-                        jar.tpc_vote(self)
+    def _commit(self, objects, jars, subtransaction, subjars):
+        # Do the real work of commit
+        
+        # It's important that:
+        #
+        # - Every object in self._objects is either committed
+        #   or aborted.
+        #
+        # - For each object that is committed
+        #   we call tpc_begin on it's jar at least once
+        #
+        # - For every jar for which we've called tpc_begin on,
+        #   we either call tpc_abort or tpc_finish. It is OK
+        #   to call these multiple times, as the storage is
+        #   required to ignore these calls if tpc_begin has not
+        #   been called.
 
-                try:
-                    # Try to finish one jar, since we may be able to
-                    # recover if the first one fails.
-                    if jarsv:
-                        jarsv[-1].tpc_finish(self) # This should never fail
-                        jarsv.pop() # It didn't, so it's taken care of.
-                except:
-                    # Bug if it does, we need to keep track of it
-                    LOG("Transaction", ERROR,
-                        "A storage error occurred in the last phase of a "
-                        "two-phase commit.  This shouldn\'t happen. ",
-                        error=sys.exc_info())
-                    raise
+        ncommitted = 0
+        jarsv = None
+        try:
+            for o in objects:
+                j = getattr(o, '_p_jar', o)
+                if j is not None:
+                    i = id(j)
+                    if i not in jars:
+                        jars[i] = j
+                        
+                        if subtransaction:
+                            if not self._subtrans_begin(o, j, subtransaction):
+                                # The jar does not support subtransactions
+                                continue
+                        else:
+                            j.tpc_begin(self)
+                    j.commit(o, self)
+                ncommitted += 1
+
+            jars.update(self._subtrans_commit(subjars))
+
+            jarsv = jars.values()
+            for jar in jarsv:
+                if not subtransaction:
+                    jar.tpc_vote(self)
+
+            # commit one first, because we can still recover if only
+            # the first one fails
+            self._commit_one(jarsv)
+            self._commit_rest(jarsv)
+        except:
+            t, v, tb = sys.exc_info()
 
-                try:
-                    while jarsv:
-                        jarsv[-1].tpc_finish(self) # This should never fail
-                        jarsv.pop() # It didn't, so it's taken care of.
-                except:                        
-                    # Bug if it does, we need to yell FIRE!
-                    # Someone finished, so don't allow any more
-                    # work without at least a restart!
-                    hosed=1
-                    LOG("Transaction", PANIC,
-                        "A storage error occurred in the last phase of a "
-                        "two-phase commit.  This shouldn't happen. "
-                        "The application may be in a hosed state, so "
-                        "transactions will not be allowed to commit "
-                        "until the site/storage is reset by a restart. ",
-                        error=sys.exc_info())
-                    raise
-                
-            except:
-                t, v, tb = sys.exc_info()
+            if jarsv is None:
+                cleanup_jars = jars.values()
+            else:
+                cleanup_jars = jarsv
+            self._commit_failed(objects[ncommitted:], cleanup_jars, subjars)
 
-                # Ugh, we got an got an error during commit, so we
-                # have to clean up.
+            raise t, v, tb
 
-                # First, we have to abort any uncommitted objects.
-                for o in objects[ncommitted:]:
-                    try:
-                        j=getattr(o, '_p_jar', o)
-                        if j is not None: j.abort(o, self)
-                    except: pass
-
-                # Then, we unwind TPC for the jars that began it.
-                if jarsv is None:
-                    jarsv = jars.values()
-                for j in jarsv:
-                    try: j.tpc_abort(self) # This should never fail
-                    except:     
-                        LOG("Transaction", ERROR,
-                            "A storage error occured during object abort "
-                            "This shouldn't happen. ",
-                            error=sys.exc_info())
-                
-                # Ugh, we need to abort work done in sub-transactions.
-                while subjars:
-                    j=subjars.pop()
-                    j.abort_sub(self) # This should never fail
+    def _commit_one(self, jarsv):
+        try:
+            # Try to finish one jar, since we may be able to recover
+            # if the first one fails.
+            if jarsv:
+                jarsv[-1].tpc_finish(self) # This should never fail
+                jarsv.pop() # It didn't, so it's taken care of.
+        except:
+            # Bug if it does, we need to keep track of it
+            LOG("Transaction", ERROR,
+                "A storage error occurred in the last phase of a "
+                "two-phase commit.  This shouldn\'t happen. ",
+                error=sys.exc_info())
+            raise
 
-                raise t,v,tb
+    def _commit_rest(self, jarsv):
+        global hosed
+        try:
+            while jarsv:
+                jarsv[-1].tpc_finish(self) # This should never fail
+                jarsv.pop() # It didn't, so it's taken care of.
+        except:                        
+            # But if it does, we need to yell FIRE!  Someone finished,
+            # so don't allow any more work without at least a restart!
+            hosed = 1
+            LOG("Transaction", PANIC, hosed_text, error=sys.exc_info())
+            raise
+
+    def _commit_failed(self, uncommitted, jarsv, subjars):
+        # Ugh, we got an got an error during commit, so we have to
+        # clean up.
 
-        finally:
-            tb = None
-            del objects[:] # clear registered
-            if not subtransaction and self._id is not None:
-                free_transaction()
+        # First, we have to abort any uncommitted objects.
+        for o in uncommitted:
+            try:
+                j = getattr(o, '_p_jar', o)
+                if j is not None:
+                    j.abort(o, self)
+            except:
+                LOG("Transaction", ERROR,
+                    "An error occured while cleaning up a failed commit.",
+                    error=sys.exc_info())
+
+        # Then, we unwind TPC for the jars that began it.
+        for j in jarsv:
+            try:
+                j.tpc_abort(self) # This should never fail
+            except:     
+                LOG("Transaction", ERROR,
+                    "A storage error occured during object abort "
+                    "This shouldn't happen. ",
+                    error=sys.exc_info())
+
+        # Ugh, we need to abort work done in sub-transactions.
+        for j in subjars:
+            j.abort_sub(self) # This should never fail
+            # XXX Shouldn't we have a try-except anyway?
+
+    def _subtrans_begin(self, obj, jar, subtransaction):
+        # If a jar does not support subtransactions, we need to save
+        # it away to be committed in the outer transaction.
+        
+        # XXX I think subtransaction is always 1, and, thus, doesn't
+        # need to be passed
+        assert subtransaction == 1
+        try:
+            jar.tpc_begin(self, subtransaction)
+        except TypeError:
+            jar.tpc_begin(self)
+
+        if hasattr(jar, 'commit_sub'):
+            self._sub[id(jar)] = jar
+            return 1
+        else:
+            if self._non_st_objects is None:
+                self._non_st_objects = []
+            self._non_st_objects.append(obj)
+            return 0
+
+    def _subtrans_commit(self, subjars):
+        jars = {}
+        # Commit work done in subtransactions
+        while subjars:
+            j = subjars.pop()
+            jars[id(j)] = j
+            j.commit_sub(self)
+        return jars
 
     def register(self, object):
         """Register the given object for transaction control."""