[Zodb-checkins] CVS: Zope3/src/transaction - txn.py:1.3 manager.py:1.3 interfaces.py:1.3 __init__.py:1.3

Jeremy Hylton jeremy@zope.com
Wed, 5 Mar 2003 17:12:41 -0500


Update of /cvs-repository/Zope3/src/transaction
In directory cvs.zope.org:/tmp/cvs-serv17811/transaction

Modified Files:
	txn.py manager.py interfaces.py __init__.py 
Log Message:
Merge jeremy-atomic-invalidation-branch.

Add suspend() and resume() to transaction manager API.
Change implementation so that the thread-aware manager does not
inherit from the thread-agnostic manager.
Add suspended transaction state.


=== Zope3/src/transaction/txn.py 1.2 => 1.3 ===
--- Zope3/src/transaction/txn.py:1.2	Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/txn.py	Wed Mar  5 17:12:38 2003
@@ -4,6 +4,7 @@
 __metaclass__ = type
 
 from transaction.interfaces import ITransaction, TransactionError
+from threading import Lock
 
 class Set(dict):
 
@@ -19,6 +20,7 @@
     COMMITTED = "Committed"
     ABORTING = "Aborting"
     ABORTED = "Aborted"
+    SUSPENDED = "Suspended"
 
 class Transaction:
 
@@ -28,7 +30,9 @@
         self._manager = manager
         self._parent = parent
         self._status = Status.ACTIVE
+        self._suspend = None
         self._resources = Set()
+        self._lock = Lock()
 
     def __repr__(self):
         return "<%s %X %s>" % (self.__class__.__name__, id(self), self._status)
@@ -69,3 +73,25 @@
     def status(self):
         """Return the status of the transaction."""
         return self._status
+
+    def suspend(self):
+        self._lock.acquire()
+        try:
+            if self._status == Status.SUSPENDED:
+                raise TransactionError("Already suspended")
+            self._manager.suspend(self)
+            self._suspend = self._status
+            self._status = Status.SUSPENDED
+        finally:
+            self._lock.release()
+
+    def resume(self):
+        self._lock.acquire()
+        try:
+            if self._status != Status.SUSPENDED:
+                raise TransactionError("Can only resume suspended transaction")
+            self._manager.resume(self)
+            self._status = self._suspend
+            self._suspend = None
+        finally:
+            self._lock.release()


=== Zope3/src/transaction/manager.py 1.2 => 1.3 ===
--- Zope3/src/transaction/manager.py:1.2	Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/manager.py	Wed Mar  5 17:12:38 2003
@@ -1,26 +1,19 @@
 import logging
 
-from transaction.interfaces import IRollback
-from transaction.txn import Transaction, Status
+from transaction.interfaces import *
+from transaction.txn import Transaction, Status, Set
 
 # XXX need to change asserts of transaction status into explicit checks
 # that raise some exception
 
 # XXX need lots of error checking
 
-class TransactionManager(object):
-
-    txn_factory = Transaction
-
-    def __init__(self):
-        self.logger = logging.getLogger("txn")
-
-    def new(self):
-        txn = self.txn_factory(self)
-        self.logger.debug("%s: begin", txn)
-        return txn
-
+class AbstractTransactionManager(object):
+    # base class to provide commit logic
+    # concrete class must provide logger attribute
+    
     def commit(self, txn):
+        # commit calls _finishCommit() or abort()
         assert txn._status is Status.ACTIVE
         txn._status = Status.PREPARING
         prepare_ok = True
@@ -35,11 +28,11 @@
         txn._status = Status.PREPARED
         # XXX An error below is intolerable.  What state to use?
         if prepare_ok:
-            self._commit(txn)
+            self._finishCommit(txn)
         else:
             self.abort(txn)
 
-    def _commit(self, txn):
+    def _finishCommit(self, txn):
         self.logger.debug("%s: commit", txn)
         # finish the two-phase commit
         for r in txn._resources:
@@ -58,6 +51,36 @@
         self.logger.debug("%s: savepoint", txn)
         return Rollback([r.savepoint(txn) for r in txn._resources])
 
+class TransactionManager(AbstractTransactionManager):
+
+    txn_factory = Transaction
+
+    __implements__ = ITransactionManager
+
+    def __init__(self):
+        self.logger = logging.getLogger("txn")
+        self._current = None
+
+    def get(self):
+        if self._current is None:
+            self._current = self.begin()
+        return self._current
+
+    def begin(self):
+        txn = self.txn_factory(self)
+        self.logger.debug("%s: begin", txn)
+        return txn
+
+    def commit(self, txn):
+        super(TransactionManager, self).commit(txn)
+        self._current = None
+
+    def abort(self, txn):
+        super(TransactionManager, self).abort(txn)
+        self._current = None
+
+    # XXX need suspend and resume
+
 class Rollback(object):
 
     __implements__ = IRollback
@@ -72,24 +95,46 @@
 # make the transaction manager visible to client code
 import thread
 
-class ThreadedTransactionManager(TransactionManager):
+class ThreadedTransactionManager(AbstractTransactionManager):
+
+    # XXX Do we need locking on _pool or _suspend?
+
+    # Most methods read and write pool based on the id of the current
+    # thread, so they should never interfere with each other.
+
+    # The suspend() and resume() methods modify the _suspend set,
+    # but suspend() only adds a new thread.  The resume() method
+    # does need a lock to prevent two different threads from resuming
+    # the same transaction.
+
+    __implements__ = ITransactionManager
 
     def __init__(self):
-        TransactionManager.__init__(self)
+        self.logger = logging.getLogger("txn")
         self._pool = {}
+        self._suspend = Set()
+        self._lock = thread.allocate_lock()
 
-    def new(self):
+    def get(self):
         tid = thread.get_ident()
         txn = self._pool.get(tid)
         if txn is None:
-            txn = super(ThreadedTransactionManager, self).new()
-            self._pool[tid] = txn
+            txn = self.begin()
+        return txn
+
+    def begin(self):
+        tid = thread.get_ident()
+        txn = self._pool.get(tid)
+        if txn is not None:
+            txn.abort()
+        txn = self.txn_factory(self)
+        self._pool[tid] = txn
         return txn
 
-    def _commit(self, txn):
+    def _finishCommit(self, txn):
         tid = thread.get_ident()
         assert self._pool[tid] is txn
-        super(ThreadedTransactionManager, self)._commit(txn)
+        super(ThreadedTransactionManager, self)._finishCommit(txn)
         del self._pool[tid]
 
     def abort(self, txn):
@@ -97,3 +142,27 @@
         assert self._pool[tid] is txn
         super(ThreadedTransactionManager, self).abort(txn)
         del self._pool[tid]
+
+    # XXX should we require that the transaction calling suspend()
+    # be the one that is using the transaction?
+
+    # XXX need to add locking to suspend() and resume()
+
+    def suspend(self, txn):
+        tid = thread.get_ident()
+        if self._pool[tid] is txn:
+            self._suspend.add(txn)
+            del self._pool[tid]
+        else:
+            raise TransactionError("txn %s not owned by thread %s" %
+                                   (txn, tid))
+
+    def resume(self, txn):
+        tid = thread.get_ident()
+        if self._pool.get(tid) is not None:
+            raise TransactionError("thread %s already has transaction" %
+                                   tid)
+        if txn not in self._suspend:
+            raise TransactionError("unknown transaction: %s" % txn)
+        del self._suspend[txn]
+        self._pool[tid] = txn


=== Zope3/src/transaction/interfaces.py 1.2 => 1.3 ===
--- Zope3/src/transaction/interfaces.py:1.2	Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/interfaces.py	Wed Mar  5 17:12:38 2003
@@ -75,3 +75,43 @@
 
     def status():
         """Return status of the current transaction."""
+
+    def suspend():
+        """Suspend the current transaction.
+
+        If a transaction is suspended, the transaction manager no
+        longer treats it as active.  The resume() method must be
+        called before the transaction can be used.
+        """
+
+    def resume():
+        """Resume the current transaction.
+
+        If another transaction is active, it must be suspended before
+        resume() is called.
+        """
+
+class ITransactionManager(Interface):
+    """Coordinates application use of transactional resources."""
+
+    def get():
+        """Return the curren transaction.
+
+        Calls new() to start a new transaction if one does not exist.
+        """
+
+    def begin():
+        """Return a new transaction.
+
+        If a transaction is currently active for the calling thread,
+        it is aborted.
+        """
+
+    def commit(txn):
+        """Commit txn."""
+
+    def abort(txn):
+        """Abort txn."""
+
+    def savepoint(txn):
+        """Return rollback object that can restore txn to current state."""


=== Zope3/src/transaction/__init__.py 1.2 => 1.3 ===
--- Zope3/src/transaction/__init__.py:1.2	Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/__init__.py	Wed Mar  5 17:12:38 2003
@@ -15,7 +15,7 @@
 from transaction.manager import ThreadedTransactionManager
 
 _manager = ThreadedTransactionManager()
-get_transaction = _manager.new
+get_transaction = _manager.get
 
 def set_factory(factory):
     _manager.txn_factory = factory