[Zope3-checkins] CVS: Zope3/src/transaction - notes.txt:1.1.2.1 _transaction.py:1.1.2.1 _manager.py:1.1.2.1 __init__.py:1.4.8.1 README.txt:1.2.8.1

Jeremy Hylton jeremy at zope.com
Thu Mar 18 16:19:04 EST 2004


Update of /cvs-repository/Zope3/src/transaction
In directory cvs.zope.org:/tmp/cvs-serv5681/src/transaction

Modified Files:
      Tag: jeremy-txn-branch
	__init__.py README.txt 
Added Files:
      Tag: jeremy-txn-branch
	notes.txt _transaction.py _manager.py 
Log Message:
Commit working changes for ReviseTransactionAPI to jeremy-txn-branch.

All of the transaction tests pass except for two failures 
    testExceptionInSubAbortSub
    testExceptionInSubCommitSub


=== Added File Zope3/src/transaction/notes.txt ===
Notes on a future transaction API
=================================

I did a brief review of the current transaction APIs from ZODB 3 and
ZODB 4, considering some of the issues that have come up since last
winter when most of the initial design and implementation of ZODB 4's
transaction API was done.

Participants
------------

There are four participants in the transaction APIs.

1. Application -- Some application code is ultimately in charge of the
transaction process.  It uses transactional resources, decides the
scope of individual transactions, and commits or aborts transactions.

2. Resource Manager -- Typically library or framework code that provides
transactional access to some resource -- a ZODB database, a relational
database, or some other resource.  It provides an API for application
code that isn't defined by the transaction framework.  It collaborates
with the transaction manager to find the current transaction.  It
collaborates with the transaction for registration, notification, and
for committing changes.

The ZODB Connection is a resource manager.  In ZODB 4, it is called a
data manager.  In ZODB 3, it is called a jar.  In other literature,
resource manager seems to be common.

3. Transaction -- coordinates the actions of application and resource
managers for a particular activity.  The transaction usually has a
short lifetime.  The application begins it, resources register with it
as the application runs, then it finishes with a commit or abort.

4. Transaction Manager -- coordinates the use of transaction.  The
transaction manager provides policies for associating resource
managers with specific transactions.  The question "What is the
current transaction?" is answered by the transaction manager.

I'm taking as a starting point the transaction API that was defined
for ZODB 4.  I reviewed it again after a lot of time away, and I still
think it's on the right track.

Current transaction
-------------------

The first question is "What is the current transaction?"  This
question is decided by the transaction manager.  An application could
chose an application manager that suites its need best.  

In the current ZODB, the transaction manager is essentially the
implementation of ZODB.Transaction.get_transaction() and the
associated thread id -> txn dict.  I think we can encapsulate this
policy an a first-class object and allow applications to decide which
one they want to use.  By default, a thread-based txn manager would be
provided.

The other responsibility of the transaction manager is to decide when
to start a new transaction.  The current ZODB transaction manager
starts one whenever a client calls get() and there is no current
transaction.  I think there could be some benefit to an explicit new()
operation that will always create a new transaction.  A particular
manager could implement the policy that get() called before new()
returns None or raises an exception.

Basic transaction API
---------------------

A transaction module or package can export a very simple API for
interacting with transactions.  It hides most of the complexity from
applications that want to use the standard Zope policies.  Here's a
sketch of an implementation:

_mgr = TransactionManager()

def get():
    """Return the current transaction."""
    return _mgr.get()

def new():
    """Return a new transaction."""
    return _mgr.new()

def commit():
    """Commit the current transaction."""
    _mgr.get().commit()

def abort():
    """Abort the current transaction."""
    _mgr.get().abort()

Application code can just import the transaction module to use the
get(), new(), abort(), and commit() methods.

The individual transaction objects should have a register() method
that is used by a resource manager to register that it has
modifications for this transaction.  It's part of the basic API, but
not the basic user API.

Extended transaction API
------------------------

There are a few other methods that might make sense on a transaction:

status() -- return a code or string indicating what state the
transaction is in -- begin, aborted, committed, etc.

note() -- add metadata to txn

The transaction module should have a mechanism for installing a new
transaction manager.

Suspend and resume
------------------

If the transaction manager's job is to decide what the current
transaction is, then it would make sense to have suspend() and
resume() APIs that allow the current activity to be stopped for a
time.  The goal of these APIs is to allow more control over
coordination.  

It seems like user code would call suspend() and resume() on
individual transaction objects, which would interact with the
transaction manager.

If suspend() and resume() are supported, then we need to think about
whether those events need to be communicated to the resource
managers. 

This is a new feature that isn't needed for ZODB 3.3.

Registration and notification
-----------------------------

The transaction object coordinates the activities of resource
managers.  When a managed resource is modified, its manager must
register with the current transaction.  (It's an error to modify an
object when there is no transaction?)

When the transaction commits or aborts, the transaction calls back to
each registered resource manager.  The callbacks form the two-phase
commit protocol.  I like the ZODB 4 names and approach prepare() (does
tpc_begin through tpc_vote on the storage).

A resource manager does not register with a transaction if none of its
resources are modified.  Some resource managers would like to know
about transaction boundaries anyway.  A ZODB Connection would like to
process invalidations at every commit, even if none of its objects
were modified.

It's not clear what the notification interface should look like or
what events are of interest.  In theory, transaction begin, abort, and
commit are all interesting; perhaps a combined abort-or-commit event
would be useful.  The ZODB use case only needs one event.

The java transaction API has beforeCompletion and afterCompletion,
where after gets passed a status code to indicate abort or commit.
I think these should be sufficient.

Nested transactions / savepoints
--------------------------------

ZODB 3 and ZODB 4 each have a limited form of nested transactions.
They are called subtransactions in ZODB 3 and savepoints in ZODB 4.
The essential mechanism is the same:  At the time of subtransaction is
committed, all the modifications up to that time are written out to a
temporary file.  The application can later revert to that saved state
or commit the main transaction, which copies modifications from the
temporary file to the real storage.

The savepoint mechanism can be used to implement the subtransaction
model, by creating a savepoint every time a subtransaction starts or
ends.

If a resource manager joins a transaction after a savepoint, we need
to create an initial savepoint for the new resource manager that will
rollback all its changes.  If the new resource manager doesn't support
savepoints, we probably need to mark earlier savepoints as invalid.
There are some edges cases to work out here.

It's not clear how nested transactions affect the transaction manager
API.  If we just use savepoint(), then there's no issue to sort out.
A nested transaction API may be more convenient.  One possibility is
to pass a transaction object to new() indicating that the new
transaction is a child of the current transaction.  Example:

    transaction.new(transaction.get())

That seems rather wordy.  Perhaps:

    transaction.child()

where this creates a new nested transaction that is a child of the
current one, raising an exception if there is no current transaction.

This illustrates that a subtransaction feature could create new
requirements for the transaction manager API.  

The current ZODB 3 API is that calling commit(1) or commit(True) means
"commit a subtransaction."  abort() has the same API.  We need to
support this API for backwards compatibility.  A new API would be a
new feature that isn't necessary for ZODB 3.3.

ZODB Connection and Transactions
--------------------------------

The Connection has three interactions with a transaction manager.
First, it registers itself with the transaction manager for
synchronization messages.  Second, it registers with the current
transaction the first time an object is modified in that transaction.
Third, there is an option to explicitly pass a transaction manager to
the connection constructor via DB.open(); the connection always uses
this transaction manager, regardless of the default manager.

Deadlock and recovery
---------------------

ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers.  The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.  The sort code doesn't exist in ZODB 4, but
could be added fairly easily.

The transaction managers don't support recovery, where recovery means
restoring a system to a consistent state after a failure during the
second phase of two-phase commit.  When a failure occurs in the second
phase, some transaction participations may not know the outcome of the
transaction.  (It would be cool to support recovery, but that's not
being discussed now.)

In the absence of real recovery manager means that our transaction
commit implementation needs to play many tricks to avoid the need for
recovery (pseudo-recovery).  For example, if the first resource
manager fails in the second phase, we attempt to abort all the other
resource managers.  (This isn't strictly correct, because we don't know the
status of the first resource manager if it fails.)  If we used
something more like the ZODB 4 implementation, we'd need to make sure
all the pseudo-recovery work is done in the new implementation.

Closing resource managers
-------------------------

The ZODB Connection is explicitly opened and closed by the
application; other resource managers probably get closed to.  The
relationship between transactions and closed resource managers is
undefined in the current API.  A transaction will probably fail if the
Connection is closed, or succeed by accident if the Connection is
re-opened. 

The resource manager - transaction API should include some means for
dealing with close.  The likely approach is to raise an error if you
close a resource manager that is currently registered with a
transaction. 

First steps
-----------

I would definitely like to see some things in ZODB 3.3:

    - simplified module-level transaction calls
    - notifications for abort-commit event
    - restructured Connection to track modified objects itself
    - explicit transaction manager object



=== Added File Zope3/src/transaction/_transaction.py ===
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""Transaction objects manage resources for an individual activity.

Compatibility issues
--------------------

The implementation of Transaction objects involves two layers of
backwards compatibility, because this version of transaction supports
both ZODB 3 and ZODB 4.  Zope is evolving towards the ZODB4
interfaces.

Transaction has two methods for a resource manager to call to
participate in a transaction -- register() and join().  join() takes a
resource manager and adds it to the list of resources.  register() is
for backwards compatibility.  It takes a persistent object and
registers its _p_jar attribute.  XXX explain adapter

Subtransactions
---------------

A subtransaction applies the transaction notion recursively.  It
allows a set of modifications within a transaction to be committed or
aborted as a group.  A subtransaction is a strictly local activity;
its changes are not visible to any other database connection until the
top-level transaction commits.  In addition to its use to organize a
large transaction, subtransactions can be used to optimize memory use.
ZODB must keep modified objects in memory until a transaction commits
and it can write the changes to the storage.  A subtransaction uses a
temporary disk storage for its commits, allowing modified objects to
be flushed from memory when the subtransaction commits.

The commit() and abort() methods take an optional subtransaction
argument that defaults to false.  If it is a true, the operation is
performed on a subtransaction.

Subtransactions add a lot of complexity to the transaction
implementation.  Some resource managers support subtransactions, but
they are not required to.  (ZODB Connection is the only standard
resource manager that supports subtransactions.)  Resources managers
that do support subtransactions implement abort_sub() and commit_sub()
methods and support a second argument to tpc_begin().

The second argument to tpc_begin() indicates that a subtransaction
commit is beginning (if it is true).  In a subtransaction, there is no
tpc_vote() call.  (XXX I don't have any idea why.)  The tpc_finish()
or tpc_abort() call applies just to that subtransaction.

Once a resource manager is involved in a subtransaction, all
subsequent transactions will be treated as subtransaction until
abort_sub() or commit_sub() is called.  abort_sub() will undo all the
changes of the subtransactions.  commit_sub() will begin a top-level
transaction and store all the changes from subtransactions.  After
commit_sub(), the transaction must still call tpc_vote() and
tpc_finish().

If the resource manager does not support subtransactions, nothing
happens when the subtransaction commits.  Instead, the resource
manager is put on a list of managers to commit when the actual
top-level transaction commits.  If this happen, it will not be
possible to abort subtransactions.

Two-phase commit
----------------

Brief description of two-phase commit goes here.

Error handling
--------------

When errors occur during two-phase commit, ...
"""

import logging
import sys
import thread

from ZODB.utils import oid_repr

_marker = object()

def myhasattr(obj, attr):
    return getattr(obj, attr, _marker) is not _marker

class Transaction(object):

    def __init__(self, synchronizers, manager):
        self._resources = []
        self._synchronizers = synchronizers or []
        self._manager = manager
        self._adapters = {}
        
        # The user, description, and _extension attributes are accessed
        # directory by storages, leading underscore notwithstanding.
        self.user = ""
        self.description = ""
        self._extension = {}

        self.log = logging.getLogger("txn.%d" % thread.get_ident())
        self.log.debug("new transaction")

        # _sub contains all of the resource managers involved in
        # subtransactions
        self._sub = {}
        # _nonsub contains all the resource managers that do not
        # support subtransactions that were involved in 
        # subtransaction commits.
        self._nonsub = {}

    def join(self, resource):
        self._resources.append(resource)

    def register(self, obj):
        # XXX This would result in multiple adapters for subtransactions.
        adapter = self._adapters.get(obj._p_jar)
        if adapter is None:
            if myhasattr(obj._p_jar, "commit_sub"):
                adapter = ObjectAdapterSub(obj)
            else:
                adapter = ObjectAdapter(obj)
            self._adapters[obj._p_jar] = adapter
        self.join(adapter)

    def commit(self, subtransaction=False):
        if not subtransaction and self._sub and self._resources:
            # This commit is for a top-level transaction that has
            # previously committed subtransactions.  Do one last
            # subtransaction commit to clear out the current objects,
            # then commit all the subjars.
            self.commit(True)

        if not subtransaction:
            for s in self._synchronizers:
                s.beforeCompletion()

        # XXX check for the hosed state?

        self._commitResources(subtransaction)

        if subtransaction:
            self._resources = []
        else:
            for s in self._synchronizers:
                s.afterCompletion()
            self._manager.free(self)
                
    def _commitResources(self, subtransaction):
        # Execute the two-phase commit protocol.

        print "commit", subtransaction

        L = self._getResourceManagers(subtransaction)
        try:
            for rm in L:
                # If you pass subtransaction=True to tpc_begin(), it
                # will create a temporary storage for the during of
                # the transaction.  To signal that the top-level
                # transaction is committing, you must then call
                # commit_sub().
                if not subtransaction and id(rm) in self._sub:
                    print "commit_sub", rm
                    rm.commit_sub(self)
                    del self._sub[id(rm)]
                else:
                    print "tpc_begin", subtransaction, rm
                    rm.tpc_begin(self, subtransaction)
                    # XXX what if rm doesn't support subtransactions?
            if not subtransaction:
                # Not sure why, but it is intentional that you do not
                # call tpc_vote() for subtransaction commits.
                for rm in L:
                    rm.tpc_vote(self)
        except:
            print self._sub.keys()
            self._cleanup(L)
            raise
        else:
            for rm in L:
                rm.tpc_finish(self)

    def _cleanup(self, L):
        # Called when an exception occurs during tpc_vote or tpc_finish.
        for rm in L:
            if not rm.voted:
                rm.cleanup(self)
        for rm in L:
            if rm in self._sub:
                print "cleanup", rm, "abort_sub"
                try:
                    rm.abort_sub(self)
                except Exception, err:
                    print err
            else:
                print "cleanup", rm, "tpc_abort"
                try:
                    rm.tpc_abort(self)
                except Exception, err:
                    print err

    def _getResourceManagers(self, subtransaction):
        L = []
        if subtransaction:
            # If we are in a subtransaction, make sure all resource
            # managers are placed in either _sub or _nonsub.  When
            # the top-level transaction commits, we need to merge
            # these back into the resource set.

            # If a data manager doesn't support sub-transactions, we
            # don't do anything with it now.  (That's somewhat okay,
            # because subtransactions are mostly just an
            # optimization.)  Save it until the top-level transaction
            # commits.

            for rm in self._resources:
                if myhasattr(rm, "commit_sub"):
                    self._sub[id(rm)] = rm
                    L.append(rm)
                else:
                    self._nonsub[id(rm)] = rm
        else:
            if self._sub or self._nonsub:
                # Merge all of _sub, _nonsub, and _resources.
                d = dict(self._sub)
                d.update(self._nonsub)
                for rm in self._resources:
                    d[id(rm)] = rm
                L = d.values()
            else:
                L = list(self._resources)
        
        L.sort(rm_cmp)
        return L
            
    def abort(self, subtransaction=False):
        if not subtransaction:
            for s in self._synchronizers:
                s.beforeCompletion()

        if subtransaction and self._nonsub:
            raise TransactionError("Resource manager does not support "
                                   "subtransaction abort")

        tb = None
        for rm in self._resources:
            try:
                rm.abort(self)
            except:
                if tb is None:
                    t, v, tb = sys.exc_info()
                self.log.error("Failed to abort resource manager: %s",
                               rm, exc_info=sys.exc_info())
                
        if not subtransaction:
            for rm in self._sub.values():
                rm.abort_sub(self)

        if not subtransaction:
            for s in self._synchronizers:
                s.afterCompletion()

        if tb is not None:
            raise t, v, tb

    def note(self, text):
        text = text.strip()
        if self.description:
            self.description += "\n\n" + text
        else:
            self.description = text

    def setUser(self, user_name, path="/"):
        self._user = "%s %s" % (path, user_name)

    def setExtendedInfo(self, name, value):
        self.metadata[name] = value

# XXX We need a better name for the adapters.

class ObjectAdapter(object):
    """Adapt the old-style register() call to the new-style join().

    With join(), a resource mananger like a Connection registers with
    the transaction manager.  With register(), an individual object
    is passed to register().
    """

    def __init__(self, obj):
        self.manager = obj._p_jar
        self.objects = [obj]
        self.ncommitted = 0
        self.voted = False

    def __repr__(self):
        return "<%s for %s at %s>" % (self.__class__.__name__,
                                      self.manager, id(self))

    def sortKey(self):
        return self.manager.sortKey()

    def tpc_begin(self, txn, sub=False):
        self.manager.tpc_begin(txn, sub)

    def tpc_vote(self, txn):
        for o in self.objects:
            self.manager.commit(o, txn)
            self.ncommitted += 1
        self.manager.tpc_vote(txn)
        self.voted = True

    def cleanup(self, txn):
        self._abort(self.objects[self.ncommitted:], txn)

    def tpc_finish(self, txn):
        self.manager.tpc_finish(txn)

    def tpc_abort(self, txn):
        self.manager.tpc_abort(txn)

    def abort(self, txn):
        self._abort(self.objects, txn)

    def _abort(self, objects, txn):
        tb = None
        for o in objects:
            try:
                self.manager.abort(o, txn)
            except:
                # Capture the first exception and re-raise it after
                # aborting all the other objects.
                if tb is None:
                    t, v, tb = sys.exc_info()
                txn.log.error("Failed to abort object: %s",
                              object_hint(o), exc_info=sys.exc_info())
        if tb is not None:
            raise t, v, tb
                
class ObjectAdapterSub(ObjectAdapter):
    """Adapt resource managers that participate in subtransactions."""

    def commit_sub(self, txn):
        self.manager.commit_sub(txn)

    def abort_sub(self, txn):
        self.manager.abort_sub(txn)

def rm_cmp(rm1, rm2):
    return cmp(rm1.sortKey(), rm2.sortKey())

def object_hint(o):
    """Return a string describing the object.

    This function does not raise an exception.
    """
    
    # We should always be able to get __class__.
    klass = o.__class__.__name__
    # oid would be great, but may this isn't a persistent object.
    oid = getattr(o, "_p_oid", _marker)
    if oid:
        oid = oid_repr(oid)
    return "%s oid=%s" % (klass, oid)
    


=== Added File Zope3/src/transaction/_manager.py ===
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""A TransactionManager controls transaction boundaries.

It coordinates application code and resource managers, so that they
are associated with the right transaction.
"""

import thread

from transaction._transaction import Transaction

class TransactionManager(object):
    """Thread-aware transaction manager.

    Each thread is associated with a unique transaction.
    """

    def __init__(self):
        # _threads maps thread ids to transactions
        self._txns = {}
        # _synchs maps thread ids to a list registered synchronizers.
        # The list is passed to the Transaction constructor, because
        # it needs to call the synchronizers when it commits.
        self._synchs = {}

    def begin(self):
        txn = self._txns.get(tid)
        if txn is not None:
            txn.abort()
        txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
        return txn

    def get(self):
        tid = thread.get_ident()
        txn = self._txns.get(tid)
        if txn is None:
            txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
        return txn

    def free(self, txn):
        tid = thread.get_ident()
        assert txn is self._txns.get(tid)
        del self._txns[tid]

    def registerSynch(self, synch):
        tid = thread.get_ident()
        L = self._synchs.setdefault(tid, [])
        L.append(synch)

    def unregisterSynch(self, synch):
        tid = thread.get_ident()
        L = self._synchs.get(tid)
        L.remove(synch)



=== Zope3/src/transaction/__init__.py 1.4 => 1.4.8.1 ===
--- Zope3/src/transaction/__init__.py:1.4	Fri Feb 20 11:56:56 2004
+++ Zope3/src/transaction/__init__.py	Thu Mar 18 16:18:33 2004
@@ -12,5 +12,22 @@
 #
 ############################################################################
 
-from ZODB.Transaction import get_transaction
+##from ZODB.Transaction import get_transaction
+
+from transaction._transaction import Transaction
+from transaction._manager import TransactionManager
+
+manager = TransactionManager()
+
+def get():
+    return manager.get()
+
+def begin():
+    return manager.begin()
+
+def commit():
+    manager.get().commit()
+
+def abort():
+    manager.get().abort()
 


=== Zope3/src/transaction/README.txt 1.2 => 1.2.8.1 ===
--- Zope3/src/transaction/README.txt:1.2	Fri Feb 20 11:56:56 2004
+++ Zope3/src/transaction/README.txt	Thu Mar 18 16:18:33 2004
@@ -11,3 +11,4 @@
 were not easy to express in the interface. This could probably use
 more work.  The semantics are presented in detail through examples of
 a sample data manager in transaction.tests.test_SampleDataManager.
+




More information about the Zope3-Checkins mailing list