[Zope3-checkins] CVS: Zope3/src/zodb/storage/tests - __init__.py:1.1.2.1 base.py:1.1.2.1 basic.py:1.1.2.1 conflict.py:1.1.2.1 corruption.py:1.1.2.1 history.py:1.1.2.1 iterator.py:1.1.2.1 local.py:1.1.2.1 minpo.py:1.1.2.1 mt.py:1.1.2.1 packable.py:1.1.2.1 persistent.py:1.1.2.1 readonly.py:1.1.2.1 recovery.py:1.1.2.1 revision.py:1.1.2.1 speed.py:1.1.2.1 synchronization.py:1.1.2.1 test_autopack.py:1.1.2.1 test_config.py:1.1.2.1 test_create.py:1.1.2.1 test_file.py:1.1.2.1 test_fsindex.py:1.1.2.1 test_mapping.py:1.1.2.1 test_storage_api.py:1.1.2.1 test_virgin.py:1.1.2.1 test_whitebox.py:1.1.2.1 test_zodb_simple.py:1.1.2.1 timeiter.py:1.1.2.1 timepickles.py:1.1.2.1 undo.py:1.1.2.1 undoversion.py:1.1.2.1 version.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:30:52 -0500


Update of /cvs-repository/Zope3/src/zodb/storage/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/storage/tests

Added Files:
      Tag: NameGeddon-branch
	__init__.py base.py basic.py conflict.py corruption.py 
	history.py iterator.py local.py minpo.py mt.py packable.py 
	persistent.py readonly.py recovery.py revision.py speed.py 
	synchronization.py test_autopack.py test_config.py 
	test_create.py test_file.py test_fsindex.py test_mapping.py 
	test_storage_api.py test_virgin.py test_whitebox.py 
	test_zodb_simple.py timeiter.py timepickles.py undo.py 
	undoversion.py version.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zodb/storage/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
# Having this makes debugging better.


=== Added File Zope3/src/zodb/storage/tests/base.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################

# Base class for unit tests at the ZODB layer

import os
import errno

from zodb.db import DB
from transaction import get_transaction


DBHOME = 'test-db'



class ZODBTestBase(BerkeleyTestBase):
    def setUp(self):
        BerkeleyTestBase.setUp(self)
        self._db = None
        try:
            self._db = DB(self._storage)
            self._conn = self._db.open()
            self._root = self._conn.root()
        except:
            self.tearDown()
            raise

    def _close(self):
        if self._db is not None:
            self._db.close()
            self._db = self._storage = self._conn = self._root = None

    def tearDown(self):
        # If the tests exited with any uncommitted objects, they'll blow up
        # subsequent tests because the next transaction commit will try to
        # commit those object.  But they're tied to closed databases, so
        # that's broken.  Aborting the transaction now saves us the headache.
        try:
            get_transaction().abort()
            self._close()
        finally:
            BerkeleyTestBase.tearDown(self)



# Basic test framework class for both the Full and Minimal Berkeley storages

import os
import errno

from zodb.storage.base import BerkeleyConfig


DBHOME = 'test-db'



class BerkeleyTestBase(StorageTestBase):
    def _zap_dbhome(self, dir):
        # If the tests exited with any uncommitted objects, they'll blow up
        # subsequent tests because the next transaction commit will try to
        # commit those object.  But they're tied to closed databases, so
        # that's broken.  Aborting the transaction now saves us the headache.
        try:
            for file in os.listdir(dir):
                os.unlink(os.path.join(dir, file))
            os.removedirs(dir)
        except OSError, e:
            if e.errno <> errno.ENOENT:
                raise

    def _mk_dbhome(self, dir):
        # Checkpointing just slows the tests down because we have to wait for
        # the thread to properly shutdown.  This can take up to 10 seconds, so
        # for the purposes of the test suite we shut off this thread.
        config = BerkeleyConfig()
        config.interval = 0
        os.mkdir(dir)
        try:
            return self.ConcreteStorage(dir, config=config)
        except:
            self._zap_dbhome(dir)
            raise

    def setUp(self):
        StorageTestBase.setUp(self)
        self._zap_dbhome(DBHOME)
        self._storage = self._mk_dbhome(DBHOME)

    def tearDown(self):
        StorageTestBase.tearDown(self)
        self._zap_dbhome(DBHOME)



class MinimalTestBase(BerkeleyTestBase):
    from zodb.storage.bdbminimal import BDBMinimalStorage
    ConcreteStorage = BDBMinimalStorage


class FullTestBase(BerkeleyTestBase):
    from zodb.storage.bdbfull import BDBFullStorage
    ConcreteStorage = BDBFullStorage


"""Provide a mixin base class for storage tests.

The StorageTestBase class provides basic setUp() and tearDown()
semantics (which you can override), and it also provides a helper
method _dostore() which performs a complete store transaction for a
single object revision.
"""

import errno
import os
import string
import sys
import types
import unittest
from cPickle import Pickler, Unpickler
from cStringIO import StringIO

from zodb.serialize import ConnectionObjectReader
from zodb.ztransaction import Transaction

from zodb.storage.tests.minpo import MinPO

ZERO = '\0'*8

def zodb_pickle(obj):
    """Create a pickle in the format expected by ZODB."""
    f = StringIO()
    p = Pickler(f, 1)
    klass = obj.__class__
    mod = getattr(klass, "__module__", None)
    state = obj.__getstate__()
    # XXX
    p.dump((mod, klass.__name__, None))
    p.dump(state)
    return f.getvalue(1)

def zodb_unpickle(data):
    """Unpickle an object stored using the format expected by ZODB."""
    u = ConnectionObjectReader(None, {})
    return u.getObject(data)

def handle_all_serials(oid, *args):
    """Return dict of oid to serialno from store() and tpc_vote().

    Raises an exception if one of the calls raised an exception.

    The storage interface got complicated when ZEO was introduced.
    Any individual store() call can return None or a sequence of
    2-tuples where the 2-tuple is either oid, serialno or an
    exception to be raised by the client.

    The original interface just returned the serialno for the
    object.
    """
    d = {}
    for arg in args:
        if isinstance(arg, types.StringType):
            d[oid] = arg
        elif arg is None:
            pass
        else:
            for oid, serial in arg:
                if not isinstance(serial, types.StringType):
                    raise serial # error from ZEO server
                d[oid] = serial
    return d

def handle_serials(oid, *args):
    """Return the serialno for oid based on multiple return values.

    A helper for function _handle_all_serials().
    """
    return handle_all_serials(oid, *args)[oid]

def import_helper(name):
    __import__(name)
    return sys.modules[name]

def removefs(base):
    """Remove all files created by FileStorage with path base."""
    for ext in '', '.old', '.tmp', '.lock', '.index', '.pack':
        path = base + ext
        try:
            os.remove(path)
        except os.error, err:
            if err[0] != errno.ENOENT:
                raise


class StorageTestBase(unittest.TestCase):

    # XXX It would be simpler if concrete tests didn't need to extend
    # setUp() and tearDown().

    def setUp(self):
        # You need to override this with a setUp that creates self._storage
        self._storage = None

    def _close(self):
        # You should override this if closing your storage requires additional
        # shutdown operations.
        if self._storage is not None:
            self._storage.close()

    def tearDown(self):
        self._close()

    def _dostore(self, oid=None, revid=None, data=None, version=None,
                 already_pickled=0, user=None, description=None):
        """Do a complete storage transaction.  The defaults are:

         - oid=None, ask the storage for a new oid
         - revid=None, use a revid of ZERO
         - data=None, pickle up some arbitrary data (the integer 7)
         - version=None, use the empty string version

        Returns the object's new revision id.
        """
        if oid is None:
            oid = self._storage.new_oid()
        if revid is None:
            revid = ZERO
        if data is None:
            data = MinPO(7)
        if type(data) == types.IntType:
            data = MinPO(data)
        if not already_pickled:
            data = zodb_pickle(data)
        if version is None:
            version = ''
        # Begin the transaction
        t = Transaction()
        if user is not None:
            t.user = user
        if description is not None:
            t.description = description
        try:
            self._storage.tpc_begin(t)
            # Store an object
            r1 = self._storage.store(oid, revid, data, version, t)
            # Finish the transaction
            r2 = self._storage.tpc_vote(t)
            revid = handle_serials(oid, r1, r2)
            self._storage.tpc_finish(t)
        except:
            self._storage.tpc_abort(t)
            raise
        return revid

    def _dostoreNP(self, oid=None, revid=None, data=None, version=None,
                   user=None, description=None):
        return self._dostore(oid, revid, data, version, already_pickled=1,
                             user=user, description=description)
    # The following methods depend on optional storage features.

    def _undo(self, tid, oid=None):
        # Undo a tid that affects a single object (oid).
        # XXX This is very specialized
        t = Transaction()
        t.note("undo")
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        if oid is not None:
            self.assertEqual(len(oids), 1)
            self.assertEqual(oids[0], oid)
        return self._storage.lastTransaction()

    def _commitVersion(self, src, dst):
        t = Transaction()
        t.note("commit %r to %r" % (src, dst))
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(src, dst, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        return oids

    def _abortVersion(self, ver):
        t = Transaction()
        t.note("abort %r" % ver)
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(ver, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        return oids


=== Added File Zope3/src/zodb/storage/tests/basic.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Run the basic tests for a storage as described in the official storage API

The most complete and most out-of-date description of the interface is:
http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storage_Interface_Info.html

All storages should be able to pass these tests.

$Id: basic.py,v 1.1.2.1 2002/12/23 19:30:50 jim Exp $
"""

from zodb.ztransaction import Transaction
from zodb import POSException

from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base \
     import zodb_unpickle, zodb_pickle, handle_serials

ZERO = '\0'*8



class BasicStorage:
    def checkBasics(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        # This should simply return
        self._storage.tpc_begin(t)
        # Aborting is easy
        self._storage.tpc_abort(t)
        # Test a few expected exceptions when we're doing operations giving a
        # different Transaction object than the one we've begun on.
        self._storage.tpc_begin(t)
        self.assertRaises(
            POSException.StorageTransactionError,
            self._storage.store,
            0, 0, 0, 0, Transaction())

        try:
            self._storage.abortVersion('dummy', Transaction())
        except (POSException.StorageTransactionError,
                POSException.VersionCommitError):
            pass # test passed ;)
        else:
            assert 0, "Should have failed, invalid transaction."

        try:
            self._storage.commitVersion('dummy', 'dummer', Transaction())
        except (POSException.StorageTransactionError,
                POSException.VersionCommitError):
            pass # test passed ;)
        else:
            assert 0, "Should have failed, invalid transaction."

        self.assertRaises(
            POSException.StorageTransactionError,
            self._storage.store,
            0, 1, 2, 3, Transaction())
        self._storage.tpc_abort(t)

    def checkSerialIsNoneForInitialRevision(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        txn = Transaction()
        self._storage.tpc_begin(txn)
        # Use None for serial.  Don't use _dostore() here because that coerces
        # serial=None to serial=ZERO.
        r1 = self._storage.store(oid, None, zodb_pickle(MinPO(11)),
                                       '', txn)
        r2 = self._storage.tpc_vote(txn)
        self._storage.tpc_finish(txn)
        newrevid = handle_serials(oid, r1, r2)
        data, revid = self._storage.load(oid, '')
        value = zodb_unpickle(data)
        eq(value, MinPO(11))
        eq(revid, newrevid)

    def checkNonVersionStore(self, oid=None, revid=None, version=None):
        revid = ZERO
        newrevid = self._dostore(revid=revid)
        # Finish the transaction.
        self.assertNotEqual(newrevid, revid)

    def checkNonVersionStoreAndLoad(self):
        eq = self.assertEqual
        oid = self._storage.new_oid()
        self._dostore(oid=oid, data=MinPO(7))
        data, revid = self._storage.load(oid, '')
        value = zodb_unpickle(data)
        eq(value, MinPO(7))
        # Now do a bunch of updates to an object
        for i in range(13, 22):
            revid = self._dostore(oid, revid=revid, data=MinPO(i))
        # Now get the latest revision of the object
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(21))

    def checkNonVersionModifiedInVersion(self):
        oid = self._storage.new_oid()
        self._dostore(oid=oid)
        self.assertEqual(self._storage.modifiedInVersion(oid), '')

    def checkConflicts(self):
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        self.assertRaises(POSException.ConflictError,
                          self._dostore,
                          oid, revid=revid1, data=MinPO(13))

    def checkWriteAfterAbort(self):
        oid = self._storage.new_oid()
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
        # Now abort this transaction
        self._storage.tpc_abort(t)
        # Now start all over again
        oid = self._storage.new_oid()
        self._dostore(oid=oid, data=MinPO(6))

    def checkAbortAfterVote(self):
        oid1 = self._storage.new_oid()
        revid1 = self._dostore(oid=oid1, data=MinPO(-2))
        oid = self._storage.new_oid()
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
        # Now abort this transaction
        self._storage.tpc_vote(t)
        self._storage.tpc_abort(t)
        # Now start all over again
        oid = self._storage.new_oid()
        revid = self._dostore(oid=oid, data=MinPO(6))

        for oid, revid in [(oid1, revid1), (oid, revid)]:
            data, _revid = self._storage.load(oid, '')
            self.assertEqual(revid, _revid)

    def checkStoreTwoObjects(self):
        noteq = self.assertNotEqual
        p31, p32, p51, p52 = map(MinPO, (31, 32, 51, 52))
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        noteq(oid1, oid2)
        revid1 = self._dostore(oid1, data=p31)
        revid2 = self._dostore(oid2, data=p51)
        noteq(revid1, revid2)
        revid3 = self._dostore(oid1, revid=revid1, data=p32)
        revid4 = self._dostore(oid2, revid=revid2, data=p52)
        noteq(revid3, revid4)

    def checkGetSerial(self):
        if not hasattr(self._storage, 'getSerial'):
            return
        eq = self.assertEqual
        p41, p42 = map(MinPO, (41, 42))
        oid = self._storage.new_oid()
        self.assertRaises(KeyError, self._storage.getSerial, oid)
        # Now store a revision
        revid1 = self._dostore(oid, data=p41)
        eq(revid1, self._storage.getSerial(oid))
        # And another one
        revid2 = self._dostore(oid, revid=revid1, data=p42)
        eq(revid2, self._storage.getSerial(oid))

    def checkTwoArgBegin(self):
        # XXX how standard is three-argument tpc_begin()?
        t = Transaction()
        tid = chr(42) * 8
        self._storage.tpc_begin(t, tid)
        oid = self._storage.new_oid()
        data = zodb_pickle(MinPO(8))
        self._storage.store(oid, None, data, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)


=== Added File Zope3/src/zodb/storage/tests/conflict.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Tests for application-level conflict resolution."""

from zodb.ztransaction import Transaction
from zodb.interfaces import ConflictError, UndoError
from zodb.conflict import ResolveObjectReader
from persistence import Persistent

from zodb.storage.tests.base import zodb_unpickle, zodb_pickle

import sys
import types
from cPickle import Pickler, Unpickler
from cStringIO import StringIO

class PCounter(Persistent):

    _value = 0

    def __repr__(self):
        return "<PCounter %d>" % self._value

    def inc(self):
        self._value = self._value + 1

class RPCounter(PCounter):
    """Version of PCounter that supports conflict resolution."""

    def _p_resolveConflict(self, oldState, savedState, newState):
        savedDiff = savedState['_value'] - oldState['_value']
        newDiff = newState['_value'] - oldState['_value']

        oldState['_value'] = oldState['_value'] + savedDiff + newDiff

        return oldState

    # XXX What if _p_resolveConflict _thinks_ it resolved the
    # conflict, but did something wrong?

class PCounter2(PCounter):
    def _p_resolveConflict(self, oldState, savedState, newState):
        raise ConflictError

class PCounter3(PCounter):
    def _p_resolveConflict(self, oldState, savedState, newState):
        raise AttributeError, "no attribute (testing conflict resolution)"

class PCounter4(PCounter):
    def _p_resolveConflict(self, oldState, savedState):
        raise RuntimeError, "Can't get here; not enough args"

class ConflictResolvingStorage:

    def checkResolve(self):
        obj = RPCounter()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.  
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))

        data, serialno = self._storage.load(oid, '')
        inst = zodb_unpickle(data)
        self.assertEqual(inst._value, 5)

    def unresolvable(self, klass):
        self.assert_(ResolveObjectReader.unresolvable(PCounter))

    def checkUnresolvable1(self):
        obj = PCounter()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.  
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(ConflictError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))
        self.unresolvable(PCounter)

    def checkUnresolvable2(self):
        obj = PCounter2()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.  
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(ConflictError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))

    def checkBuggyResolve1(self):
        obj = PCounter3()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.  
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(AttributeError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))

    def checkBuggyResolve2(self):
        obj = PCounter4()
        obj.inc()

        oid = self._storage.new_oid()

        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))

        obj.inc()
        obj.inc()
        # The effect of committing two transactions with the same
        # pickle is to commit two different transactions relative to
        # revid1 that add two to _value.  
        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
        self.assertRaises(TypeError,
                          self._dostoreNP,
                          oid, revid=revid1, data=zodb_pickle(obj))

class ConflictResolvingTransUndoStorage:

    def checkUndoConflictResolution(self):
        # This test is based on checkNotUndoable in the
        # TransactionalUndoStorage test suite.  Except here, conflict
        # resolution should allow us to undo the transaction anyway.
        
        obj = RPCounter()
        obj.inc()
        oid = self._storage.new_oid()
        revid_a = self._dostore(oid, data=obj)
        obj.inc()
        revid_b = self._dostore(oid, revid=revid_a, data=obj)
        obj.inc()
        revid_c = self._dostore(oid, revid=revid_b, data=obj)
        # Start the undo
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.transactionalUndo(tid, t)
        self._storage.tpc_finish(t)

    def checkUndoUnresolvable(self):
        # This test is based on checkNotUndoable in the
        # TransactionalUndoStorage test suite.  Except here, conflict
        # resolution should allow us to undo the transaction anyway.
        
        obj = PCounter2()
        obj.inc()
        oid = self._storage.new_oid()
        revid_a = self._dostore(oid, data=obj)
        obj.inc()
        revid_b = self._dostore(oid, revid=revid_a, data=obj)
        obj.inc()
        revid_c = self._dostore(oid, revid=revid_b, data=obj)
        # Start the undo
        info = self._storage.undoInfo()
        tid = info[1]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        self.assertRaises(UndoError, self._storage.transactionalUndo,
                          tid, t)
        self._storage.tpc_abort(t)



=== Added File Zope3/src/zodb/storage/tests/corruption.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Do some minimal tests of data corruption"""

import os
import random
import stat
import tempfile
import unittest

import ZODB, ZODB.FileStorage
from zodb.storage.tests.base import StorageTestBase

class FileStorageCorruptTests(StorageTestBase):

    def setUp(self):
        self.path = tempfile.mktemp()
        self._storage = ZODB.FileStorage.FileStorage(self.path, create=1)

    def tearDown(self):
        self._storage.close()
        for ext in '', '.old', '.tmp', '.lock', '.index':
            path = self.path + ext
            if os.path.exists(path):
                os.remove(path)

    def _do_stores(self):
        oids = []
        for i in range(5):
            oid = self._storage.new_oid()
            revid = self._dostore(oid)
            oids.append((oid, revid))
        return oids

    def _check_stores(self, oids):
        for oid, revid in oids:
            data, s_revid = self._storage.load(oid, '')
            self.assertEqual(s_revid, revid)

    def checkTruncatedIndex(self):
        oids = self._do_stores()
        self._close()
        
        # truncation the index file
        path = self.path + '.index'
        self.failUnless(os.path.exists(path))
        f = open(path, 'r+')
        f.seek(0, 2)
        size = f.tell()
        f.seek(size / 2)
        f.truncate()
        f.close()

        self._storage = ZODB.FileStorage.FileStorage(self.path)
        self._check_stores(oids)

    def checkCorruptedIndex(self):
        oids = self._do_stores()
        self._close()
        
        # truncation the index file
        path = self.path + '.index'
        self.failUnless(os.path.exists(path))
        size = os.stat(path)[stat.ST_SIZE]
        f = open(path, 'r+')
        while f.tell() < size:
            f.seek(random.randrange(1, size / 10), 1)
            f.write('\000')
        f.close()

        self._storage = ZODB.FileStorage.FileStorage(self.path)
        self._check_stores(oids)


=== Added File Zope3/src/zodb/storage/tests/history.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Run the history() related tests for a storage.

Any storage that supports the history() method should be able to pass
all these tests.
"""

from zodb.ztransaction import Transaction
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle



class HistoryStorage:
    def checkSimpleHistory(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions of the object
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now get various snapshots of the object's history
        h = self._storage.history(oid, size=1)
        eq(len(h), 1)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        # Try to get 2 historical revisions
        h = self._storage.history(oid, size=2)
        eq(len(h), 2)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        # Try to get all 3 historical revisions
        h = self._storage.history(oid, size=3)
        eq(len(h), 3)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[2]
        eq(d['serial'], revid1)
        eq(d['version'], '')
        # There should be no more than 3 revisions
        h = self._storage.history(oid, size=4)
        eq(len(h), 3)
        d = h[0]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[2]
        eq(d['serial'], revid1)
        eq(d['version'], '')

    def checkVersionHistory(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now store some new revisions in a version
        version = 'test-version'
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
                               version=version)
        revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
                               version=version)
        revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
                               version=version)
        # Now, try to get the six historical revisions (first three are in
        # 'test-version', followed by the non-version revisions).
        h = self._storage.history(oid, version, 100)
        eq(len(h), 6)
        d = h[0]
        eq(d['serial'], revid6)
        eq(d['version'], version)
        d = h[1]
        eq(d['serial'], revid5)
        eq(d['version'], version)
        d = h[2]
        eq(d['serial'], revid4)
        eq(d['version'], version)
        d = h[3]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[4]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[5]
        eq(d['serial'], revid1)
        eq(d['version'], '')

    def checkHistoryAfterVersionCommit(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now store some new revisions in a version
        version = 'test-version'
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
                               version=version)
        revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
                               version=version)
        revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
                               version=version)
        # Now commit the version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # After consultation with Jim, we agreed that the semantics of
        # revision id's after a version commit is that the committed object
        # gets a new serial number (a.k.a. revision id).  Note that
        # FileStorage is broken here; the serial number in the post-commit
        # non-version revision will be the same as the serial number of the
        # previous in-version revision.
        #
        # BAW: Using load() is the only way to get the serial number of the
        # current revision of the object.  But at least this works for both
        # broken and working storages.
        ign, revid7 = self._storage.load(oid, '')
        # Now, try to get the six historical revisions (first three are in
        # 'test-version', followed by the non-version revisions).
        h = self._storage.history(oid, version, 100)
        eq(len(h), 7)
        d = h[0]
        eq(d['serial'], revid7)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid6)
        eq(d['version'], version)
        d = h[2]
        eq(d['serial'], revid5)
        eq(d['version'], version)
        d = h[3]
        eq(d['serial'], revid4)
        eq(d['version'], version)
        d = h[4]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[5]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[6]
        eq(d['serial'], revid1)
        eq(d['version'], '')

    def checkHistoryAfterVersionAbort(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now store some new revisions in a version
        version = 'test-version'
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14),
                               version=version)
        revid5 = self._dostore(oid, revid=revid4, data=MinPO(15),
                               version=version)
        revid6 = self._dostore(oid, revid=revid5, data=MinPO(16),
                               version=version)
        # Now commit the version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # After consultation with Jim, we agreed that the semantics of
        # revision id's after a version commit is that the committed object
        # gets a new serial number (a.k.a. revision id).  Note that
        # FileStorage is broken here; the serial number in the post-commit
        # non-version revision will be the same as the serial number of the
        # previous in-version revision.
        #
        # BAW: Using load() is the only way to get the serial number of the
        # current revision of the object.  But at least this works for both
        # broken and working storages.
        ign, revid7 = self._storage.load(oid, '')
        # Now, try to get the six historical revisions (first three are in
        # 'test-version', followed by the non-version revisions).
        h = self._storage.history(oid, version, 100)
        eq(len(h), 7)
        d = h[0]
        eq(d['serial'], revid7)
        eq(d['version'], '')
        d = h[1]
        eq(d['serial'], revid6)
        eq(d['version'], version)
        d = h[2]
        eq(d['serial'], revid5)
        eq(d['version'], version)
        d = h[3]
        eq(d['serial'], revid4)
        eq(d['version'], version)
        d = h[4]
        eq(d['serial'], revid3)
        eq(d['version'], '')
        d = h[5]
        eq(d['serial'], revid2)
        eq(d['version'], '')
        d = h[6]
        eq(d['serial'], revid1)
        eq(d['version'], '')


=== Added File Zope3/src/zodb/storage/tests/iterator.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Run tests against the iterator() interface for storages.

Any storage that supports the iterator() method should be able to pass
all these tests.
"""

from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle
from zodb.utils import u64, p64
from zodb.ztransaction import Transaction


class IteratorCompare:

    def iter_verify(self, txniter, revids, val0):
        eq = self.assertEqual
        oid = self._oid
        val = val0
        for reciter, revid in zip(txniter, revids + [None]):
            eq(reciter.tid, revid)
            for rec in reciter:
                eq(rec.oid, oid)
                eq(rec.serial, revid)
                eq(rec.version, '')
                eq(zodb_unpickle(rec.data), MinPO(val))
                val = val + 1
        eq(val, val0 + len(revids))

class IteratorStorage(IteratorCompare):

    def checkSimpleIteration(self):
        # Store a bunch of revisions of a single object
        self._oid = oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        # Now iterate over all the transactions and compare carefully
        txniter = self._storage.iterator()
        self.iter_verify(txniter, [revid1, revid2, revid3], 11)

    def checkClose(self):
        self._oid = oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        txniter = self._storage.iterator()
        txniter.close()
        self.assertRaises(IOError, txniter.__getitem__, 0)

    def checkVersionIterator(self):
        if not self._storage.supportsVersions():
            return
        self._dostore()
        self._dostore(version='abort')
        self._dostore()
        self._dostore(version='abort')
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.abortVersion('abort', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        self._dostore(version='commit')
        self._dostore()
        self._dostore(version='commit')
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.commitVersion('commit', '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)

        txniter = self._storage.iterator()
        for trans in txniter:
            for data in trans:
                pass

    def checkUndoZombieNonVersion(self):
        if not hasattr(self._storage, 'supportsTransactionalUndo'):
            return
        if not self._storage.supportsTransactionalUndo():
            return

        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(94))
        # Get the undo information
        info = self._storage.undoInfo()
        tid = info[0]['id']
        # Undo the creation of the object, rendering it a zombie
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # Now attempt to iterator over the storage
        iter = self._storage.iterator()
        for txn in iter:
            for rec in txn:
                pass

        # The last transaction performed an undo of the transaction that
        # created object oid.  (As Barry points out, the object is now in the
        # George Bailey state.)  Assert that the final data record contains
        # None in the data attribute.
        self.assertEqual(rec.oid, oid)
        self.assertEqual(rec.data, None)

    def checkTransactionExtensionFromIterator(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(1))
        iter = self._storage.iterator()
        count = 0
        for txn in iter:
            self.assertEqual(txn._extension, {})
            count +=1
        self.assertEqual(count, 1)


class ExtendedIteratorStorage(IteratorCompare):

    def checkExtendedIteration(self):
        # Store a bunch of revisions of a single object
        self._oid = oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=MinPO(11))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(12))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(13))
        revid4 = self._dostore(oid, revid=revid3, data=MinPO(14))
        # Note that the end points are included
        # Iterate over all of the transactions with explicit start/stop
        txniter = self._storage.iterator(revid1, revid4)
        self.iter_verify(txniter, [revid1, revid2, revid3, revid4], 11)
        # Iterate over some of the transactions with explicit start
        txniter = self._storage.iterator(revid3)
        self.iter_verify(txniter, [revid3, revid4], 13)
        # Iterate over some of the transactions with explicit stop
        txniter = self._storage.iterator(None, revid2)
        self.iter_verify(txniter, [revid1, revid2], 11)
        # Iterate over some of the transactions with explicit start+stop
        txniter = self._storage.iterator(revid2, revid3)
        self.iter_verify(txniter, [revid2, revid3], 12)
        # Specify an upper bound somewhere in between values
        revid3a = p64((u64(revid3) + u64(revid4)) / 2)
        txniter = self._storage.iterator(revid2, revid3a)
        self.iter_verify(txniter, [revid2, revid3], 12)
        # Specify a lower bound somewhere in between values.
        # revid2 == revid1+1 is very likely on Windows.  Adding 1 before
        # dividing ensures that "the midpoint" we compute is strictly larger
        # than revid1.
        revid1a = p64((u64(revid1) + 1 + u64(revid2)) / 2)
        assert revid1 < revid1a
        txniter = self._storage.iterator(revid1a, revid3a)
        self.iter_verify(txniter, [revid2, revid3], 12)
        # Specify an empty range
        txniter = self._storage.iterator(revid3, revid2)
        self.iter_verify(txniter, [], 13)
        # Specify a singleton range
        txniter = self._storage.iterator(revid3, revid3)
        self.iter_verify(txniter, [revid3], 13)

class IteratorDeepCompare:
    def compare(self, storage1, storage2):
        eq = self.assertEqual
        iter1 = storage1.iterator()
        iter2 = storage2.iterator()
        for txn1, txn2 in zip(iter1, iter2):
            eq(txn1.tid,         txn2.tid)
            eq(txn1.status,      txn2.status)
            eq(txn1.user,        txn2.user)
            eq(txn1.description, txn2.description)
            eq(txn1._extension,  txn2._extension)
            for rec1, rec2 in zip(txn1, txn2):
                eq(rec1.oid,     rec2.oid)
                eq(rec1.serial,  rec2.serial)
                eq(rec1.version, rec2.version)
                eq(rec1.data,    rec2.data)
            # Make sure there are no more records left in rec1 and rec2,
            # meaning they were the same length.
            self.assertRaises(IndexError, txn1.next)
            self.assertRaises(IndexError, txn2.next)
        # Make sure ther are no more records left in txn1 and txn2, meaning
        # they were the same length
        self.assertRaises(IndexError, iter1.next)
        self.assertRaises(IndexError, iter2.next)


=== Added File Zope3/src/zodb/storage/tests/local.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
class LocalStorage:
    """A single test that only make sense for local storages.

    A local storage is one that doens't use ZEO. The __len__()
    implementation for ZEO is inexact.
    """
    def checkLen(self):
        eq = self.assertEqual
        # The length of the database ought to grow by one each time
        eq(len(self._storage), 0)
        self._dostore()
        eq(len(self._storage), 1)
        self._dostore()
        eq(len(self._storage), 2)


=== Added File Zope3/src/zodb/storage/tests/minpo.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""A minimal persistent object to use for tests"""

from persistence import Persistent

class MinPO(Persistent):
    def __init__(self, value=None):
        self.value = value

    def __cmp__(self, aMinPO):
        return cmp(self.value, aMinPO.value)

    def __repr__(self):
        return "MinPO(%s)" % self.value


=== Added File Zope3/src/zodb/storage/tests/mt.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import random
import threading
import time

import zodb.db
from persistence.dict import PersistentDict
from transaction import get_transaction
from zodb.ztransaction import Transaction

from zodb.storage.tests.base \
     import StorageTestBase, zodb_pickle, zodb_unpickle, handle_serials
from zodb.storage.tests.minpo import MinPO
from zodb.interfaces import ConflictError

SHORT_DELAY = 0.01

def sort(l):
    "Sort a list in place and return it."
    l.sort()
    return l

class ZODBClientThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, db, test, commits=10, delay=SHORT_DELAY):
        self.__super_init()
        self.setDaemon(1)
        self.db = db
        self.test = test
        self.commits = commits
        self.delay = delay

    def run(self):
        conn = self.db.open()
        root = conn.root()
        d = self.get_thread_dict(root)
        if d is None:
            self.test.fail()
        else:
            for i in range(self.commits):
                self.commit(d, i)
        self.test.assertEqual(sort(d.keys()), range(self.commits))

    def commit(self, d, num):
        d[num] = time.time()
        time.sleep(self.delay)
        get_transaction().commit()
        time.sleep(self.delay)

    def get_thread_dict(self, root):
        name = self.getName()
        # arbitrarily limit to 10 re-tries
        for i in range(10):
            try:
                m = PersistentDict()
                root[name] = m
                get_transaction().commit()
                break
            except ConflictError, err:
                get_transaction().abort()
        for i in range(10):
            try:
                return root.get(name)
            except ConflictError, err:
                get_transaction().abort()

class StorageClientThread(threading.Thread):

    __super_init = threading.Thread.__init__

    def __init__(self, storage, test, commits=10, delay=SHORT_DELAY):
        self.__super_init()
        self.storage = storage
        self.test = test
        self.commits = commits
        self.delay = delay
        self.oids = {}

    def run(self):
        for i in range(self.commits):
            self.dostore(i)
        self.check()

    def check(self):
        for oid, revid in self.oids.items():
            data, serial = self.storage.load(oid, '')
            self.test.assertEqual(serial, revid)
            obj = zodb_unpickle(data)
            self.test.assertEqual(obj.value[0], self.getName())

    def pause(self):
        time.sleep(self.delay)

    def oid(self):
        oid = self.storage.new_oid()
        self.oids[oid] = None
        return oid

    def dostore(self, i):
        data = zodb_pickle(MinPO((self.getName(), i)))
        t = Transaction()
        oid = self.oid()
        self.pause()

        self.storage.tpc_begin(t)
        self.pause()

        # Always create a new object, signified by None for revid
        r1 = self.storage.store(oid, None, data, '', t)
        self.pause()

        r2 = self.storage.tpc_vote(t)
        self.pause()

        self.storage.tpc_finish(t)
        self.pause()

        revid = handle_serials(oid, r1, r2)
        self.oids[oid] = revid

class ExtStorageClientThread(StorageClientThread):

    def run(self):
        # pick some other storage ops to execute
        ops = [getattr(self, meth) for meth in dir(ExtStorageClientThread)
               if meth.startswith('do_')]
        assert ops, "Didn't find an storage ops in %s" % self.storage
        # do a store to guarantee there's at least one oid in self.oids
        self.dostore(0)

        for i in range(self.commits - 1):
            meth = random.choice(ops)
            meth()
            self.dostore(i)
        self.check()

    def pick_oid(self):
        return random.choice(self.oids.keys())

    def do_load(self):
        oid = self.pick_oid()
        self.storage.load(oid, '')

    def do_loadSerial(self):
        oid = self.pick_oid()
        self.storage.loadSerial(oid, self.oids[oid])

    def do_modifiedInVersion(self):
        oid = self.pick_oid()
        self.storage.modifiedInVersion(oid)

    def do_undoLog(self):
        self.storage.undoLog(0, -20)

    def do_iterator(self):
        try:
            iter = self.storage.iterator()
        except AttributeError:
            # XXX It's hard to detect that a ZEO ClientStorage
            # doesn't have this method, but does have all the others.
            return
        for obj in iter:
            pass

class MTStorage:
    "Test a storage with multiple client threads executing concurrently."

    def _checkNThreads(self, n, constructor, *args):
        threads = [constructor(*args) for i in range(n)]
        for t in threads:
            t.start()
        for t in threads:
            t.join(60)
        for t in threads:
            self.failIf(t.isAlive(), "thread failed to finish in 60 seconds")
    
    def check2ZODBThreads(self):
        db = ZODB.DB.DB(self._storage)
        self._checkNThreads(2, ZODBClientThread, db, self)

    def check7ZODBThreads(self):
        db = ZODB.DB.DB(self._storage)
        self._checkNThreads(7, ZODBClientThread, db, self)

    def check2StorageThreads(self):
        self._checkNThreads(2, StorageClientThread, self._storage, self)
    
    def check7StorageThreads(self):
        self._checkNThreads(7, StorageClientThread, self._storage, self)

    def check4ExtStorageThread(self):
        self._checkNThreads(4, ExtStorageClientThread, self._storage, self)
        


=== Added File Zope3/src/zodb/storage/tests/packable.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Run some tests relevant for storages that support pack()."""

try:
    import cPickle
    pickle = cPickle
    #import cPickle as pickle
except ImportError:
    import pickle

from cStringIO import StringIO

import time
from zodb.db import DB
from zodb.storage.tests.minpo import MinPO
from persistence import Persistent
from transaction import get_transaction

ZERO = '\0'*8


# This class is for the root object.  It must not contain a getoid() method
# (really, attribute).  The persistent pickling machinery -- in the dumps()
# function below -- will pickle Root objects as normal, but any attributes
# which reference persistent Object instances will get pickled as persistent
# ids, not as the object's state.  This makes the referencesf stuff work,
# because it pickle sniffs for persistent ids (so we have to get those
# persistent ids into the root object's pickle).
class Root:
    pass


# This is the persistent Object class.  Because it has a getoid() method, the
# persistent pickling machinery -- in the dumps() function below -- will
# pickle the oid string instead of the object's actual state.  Yee haw, this
# stuff is deep. ;)
class Object:
    def __init__(self, oid):
        self._oid = oid

    def getoid(self):
        return self._oid

class C(Persistent):
    pass


# Here's where all the magic occurs.  Sadly, the pickle module is a bit
# underdocumented, but here's what happens: by setting the persistent_id
# attribute to getpersid() on the pickler, that function gets called for every
# object being pickled.  By returning None when the object has no getoid
# attribute, it signals pickle to serialize the object as normal.  That's how
# the Root instance gets pickled correctly.  But, if the object has a getoid
# attribute, then by returning that method's value, we tell pickle to
# serialize the persistent id of the object instead of the object's state.
# That sets the pickle up for proper sniffing by the referencesf machinery.
# Fun, huh?
def dumps(obj):
    def getpersid(obj):
        if hasattr(obj, 'getoid'):
            return obj.getoid()
        return None
    s = StringIO()
    p = pickle.Pickler(s)
    p.persistent_id = getpersid
    p.dump(obj)
    return s.getvalue()



class PackableStorageBase:
    # We keep a cache of object ids to instances so that the unpickler can
    # easily return any persistent object.
    _cache = {}

    def _newobj(self):
        # This is a convenience method to create a new persistent Object
        # instance.  It asks the storage for a new object id, creates the
        # instance with the given oid, populates the cache and returns the
        # object.
        oid = self._storage.new_oid()
        obj = Object(oid)
        self._cache[obj.getoid()] = obj
        return obj

    def _makeloader(self):
        # This is the other side of the persistent pickling magic.  We need a
        # custom unpickler to mirror our custom pickler above.  By setting the
        # persistent_load function of the unpickler to self._cache.get(),
        # whenever a persistent id is unpickled, it will actually return the
        # Object instance out of the cache.  As far as returning a function
        # with an argument bound to an instance attribute method, we do it
        # this way because it makes the code in the tests more succinct.
        #
        # BUT!  Be careful in your use of loads() vs. pickle.loads().  loads()
        # should only be used on the Root object's pickle since it's the only
        # special one.  All the Object instances should use pickle.loads().
        def loads(str, persfunc=self._cache.get):
            fp = StringIO(str)
            u = pickle.Unpickler(fp)
            u.persistent_load = persfunc
            return u.load()
        return loads



class PackableStorage(PackableStorageBase):
    def _initroot(self):
        try:
            self._storage.load(ZERO, '')
        except KeyError:
            from persistence.dict import PersistentDict
            from zodb.ztransaction import Transaction
            file = StringIO()
            p = cPickle.Pickler(file, 1)
            p.dump((PersistentDict, None))
            p.dump(PersistentDict().__getstate__())
            t = Transaction()
            t.note("initial database creation")
            self._storage.tpc_begin(t)
            self._storage.store(ZERO, None, file.getvalue(), '', t)
            self._storage.tpc_vote(t)
            self._storage.tpc_finish(t)

    def checkPackEmptyStorage(self):
        self._storage.pack(time.time())

    def checkPackTomorrow(self):
        self._initroot()
        self._storage.pack(time.time() + 100000)
            
    def checkPackYesterday(self):
        self._initroot()
        self._storage.pack(time.time() - 100000)
            
    def checkPackAllRevisions(self):
        self._initroot()
        eq = self.assertEqual
        raises = self.assertRaises
        # Create a `persistent' object
        obj = self._newobj()
        oid = obj.getoid()
        obj.value = 1
        # Commit three different revisions
        revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
        obj.value = 2
        revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
        obj.value = 3
        revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
        # Now make sure all three revisions can be extracted
        data = self._storage.loadSerial(oid, revid1)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 1)
        data = self._storage.loadSerial(oid, revid2)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 2)
        data = self._storage.loadSerial(oid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)
        # Now pack all transactions; need to sleep a second to make
        # sure that the pack time is greater than the last commit time.
        time.sleep(1)
        self._storage.pack(time.time())
        # All revisions of the object should be gone, since there is no
        # reference from the root object to this object.
        raises(KeyError, self._storage.loadSerial, oid, revid1)
        raises(KeyError, self._storage.loadSerial, oid, revid2)
        raises(KeyError, self._storage.loadSerial, oid, revid3)

    def checkPackJustOldRevisions(self):
        eq = self.assertEqual
        raises = self.assertRaises
        loads = self._makeloader()
        # Create a root object.  This can't be an instance of Object,
        # otherwise the pickling machinery will serialize it as a persistent
        # id and not as an object that contains references (persistent ids) to
        # other objects.
        root = Root()
        # Create a persistent object, with some initial state
        obj = self._newobj()
        oid = obj.getoid()
        # Link the root object to the persistent object, in order to keep the
        # persistent object alive.  Store the root object.
        root.obj = obj
        root.value = 0
        revid0 = self._dostoreNP(ZERO, data=dumps(root))
        # Make sure the root can be retrieved
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        # Commit three different revisions of the other object
        obj.value = 1
        revid1 = self._dostoreNP(oid, data=pickle.dumps(obj))
        obj.value = 2
        revid2 = self._dostoreNP(oid, revid=revid1, data=pickle.dumps(obj))
        obj.value = 3
        revid3 = self._dostoreNP(oid, revid=revid2, data=pickle.dumps(obj))
        # Now make sure all three revisions can be extracted
        data = self._storage.loadSerial(oid, revid1)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 1)
        data = self._storage.loadSerial(oid, revid2)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 2)
        data = self._storage.loadSerial(oid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)
        # Now pack just revisions 1 and 2.  The object's current revision
        # should stay alive because it's pointed to by the root.
        time.sleep(1)
        self._storage.pack(time.time())
        # Make sure the revisions are gone, but that object zero and revision
        # 3 are still there and correct
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        raises(KeyError, self._storage.loadSerial, oid, revid1)
        raises(KeyError, self._storage.loadSerial, oid, revid2)
        data = self._storage.loadSerial(oid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)
        data, revid = self._storage.load(oid, '')
        eq(revid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid)
        eq(pobj.value, 3)

    def checkPackOnlyOneObject(self):
        eq = self.assertEqual
        raises = self.assertRaises
        loads = self._makeloader()
        # Create a root object.  This can't be an instance of Object,
        # otherwise the pickling machinery will serialize it as a persistent
        # id and not as an object that contains references (persistent ids) to
        # other objects.
        root = Root()
        # Create a persistent object, with some initial state
        obj1 = self._newobj()
        oid1 = obj1.getoid()
        # Create another persistent object, with some initial state.  Make
        # sure its oid is greater than the first object's oid.
        obj2 = self._newobj()
        oid2 = obj2.getoid()
        self.failUnless(oid2 > oid1)
        # Link the root object to the persistent objects, in order to keep
        # them alive.  Store the root object.
        root.obj1 = obj1
        root.obj2 = obj2
        root.value = 0
        revid0 = self._dostoreNP(ZERO, data=dumps(root))
        # Make sure the root can be retrieved
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        # Commit three different revisions of the first object
        obj1.value = 1
        revid1 = self._dostoreNP(oid1, data=pickle.dumps(obj1))
        obj1.value = 2
        revid2 = self._dostoreNP(oid1, revid=revid1, data=pickle.dumps(obj1))
        obj1.value = 3
        revid3 = self._dostoreNP(oid1, revid=revid2, data=pickle.dumps(obj1))
        # Now make sure all three revisions can be extracted
        data = self._storage.loadSerial(oid1, revid1)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 1)
        data = self._storage.loadSerial(oid1, revid2)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 2)
        data = self._storage.loadSerial(oid1, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 3)
        # Now commit a revision of the second object
        obj2.value = 11
        revid4 = self._dostoreNP(oid2, data=pickle.dumps(obj2))
        # And make sure the revision can be extracted
        data = self._storage.loadSerial(oid2, revid4)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid2)
        eq(pobj.value, 11)
        # Now pack just revisions 1 and 2 of object1.  Object1's current
        # revision should stay alive because it's pointed to by the root, as
        # should Object2's current revision.
        time.sleep(1)
        self._storage.pack(time.time())
        # Make sure the revisions are gone, but that object zero, object2, and
        # revision 3 of object1 are still there and correct.
        data, revid = self._storage.load(ZERO, '')
        eq(revid, revid0)
        eq(loads(data).value, 0)
        raises(KeyError, self._storage.loadSerial, oid1, revid1)
        raises(KeyError, self._storage.loadSerial, oid1, revid2)
        data = self._storage.loadSerial(oid1, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 3)
        data, revid = self._storage.load(oid1, '')
        eq(revid, revid3)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid1)
        eq(pobj.value, 3)
        data, revid = self._storage.load(oid2, '')
        eq(revid, revid4)
        eq(loads(data).value, 11)
        data = self._storage.loadSerial(oid2, revid4)
        pobj = pickle.loads(data)
        eq(pobj.getoid(), oid2)
        eq(pobj.value, 11)
        
    def checkPackUnlinkedFromRoot(self):
        eq = self.assertEqual
        db = DB(self._storage)
        conn = db.open()
        root = conn.root()

        txn = get_transaction()
        txn.note('root')
        txn.commit()

        now = packtime = time.time()
        while packtime <= now:
            packtime = time.time()

        obj = MinPO(7)

        root['obj'] = obj
        txn = get_transaction()
        txn.note('root -> o1')
        txn.commit()

        del root['obj']
        txn = get_transaction()
        txn.note('root -x-> o1')
        txn.commit()

        self._storage.pack(packtime)
        
        log = self._storage.undoLog()
        tid = log[0]['id']
        db.undo(tid)
        txn = get_transaction()
        txn.note('undo root -x-> o1')
        txn.commit()

        conn.sync()

        eq(root['obj'].value, 7)


=== Added File Zope3/src/zodb/storage/tests/persistent.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Test that a storage's values persist across open and close."""

class PersistentStorage:

    def checkUpdatesPersist(self):
        oids = []
        
        def new_oid_wrapper(l=oids, new_oid=self._storage.new_oid):
            oid = new_oid()
            l.append(oid)
            return oid

        self._storage.new_oid = new_oid_wrapper

        self._dostore()
        oid = self._storage.new_oid()
        revid = self._dostore(oid)
        self._dostore(oid, revid, data=8, version='b')
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=1)
        revid = self._dostore(oid, revid, data=2)
        self._dostore(oid, revid, data=3)

        # keep copies of all the objects
        objects = []
        for oid in oids:
            p, s = self._storage.load(oid, '')
            objects.append((oid, '', p, s))
            ver = self._storage.modifiedInVersion(oid)
            if ver:
                p, s = self._storage.load(oid, ver)
                objects.append((oid, ver, p, s))
                
        self._storage.close()
        self.open()

        # keep copies of all the objects
        for oid, ver, p, s in objects:
            _p, _s = self._storage.load(oid, ver)
            self.assertEquals(p, _p)
            self.assertEquals(s, _s)


=== Added File Zope3/src/zodb/storage/tests/readonly.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zodb.interfaces import ReadOnlyError
from zodb.ztransaction import Transaction

class ReadOnlyStorage:

    def _create_data(self):
        # test a read-only storage that already has some data
        self.oids = {}
        for i in range(10):
            oid = self._storage.new_oid()
            revid = self._dostore(oid)
            self.oids[oid] = revid

    def _make_readonly(self):
        self._storage.close()
        self.open(read_only=1)
        self.assert_(self._storage.isReadOnly())

    def checkReadMethods(self):
        self._create_data()
        self._make_readonly()
        # XXX not going to bother checking all read methods
        for oid in self.oids.keys():
            data, revid = self._storage.load(oid, '')
            self.assertEqual(revid, self.oids[oid])
            self.assert_(not self._storage.modifiedInVersion(oid))
            _data = self._storage.loadSerial(oid, revid)
            self.assertEqual(data, _data)

    def checkWriteMethods(self):
        self._make_readonly()
        t = Transaction()
        self.assertRaises(ReadOnlyError, self._storage.new_oid)
        self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)

        self.assertRaises(ReadOnlyError, self._storage.abortVersion,
                          '', t)
        self.assertRaises(ReadOnlyError, self._storage.commitVersion,
                          '', '', t)
        self.assertRaises(ReadOnlyError, self._storage.store,
                          '\000' * 8, None, '', '', t)

        if self._storage.supportsTransactionalUndo():
            self.assertRaises(ReadOnlyError, self._storage.transactionalUndo,
                              '\000' * 8, t)


=== Added File Zope3/src/zodb/storage/tests/recovery.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""More recovery and iterator tests.

$Id: recovery.py,v 1.1.2.1 2002/12/23 19:30:50 jim Exp $
"""

from zodb.ztransaction import Transaction
from zodb.storage.tests.iterator import IteratorDeepCompare
from zodb.storage.tests.base import MinPO, zodb_unpickle
from zodb.db import DB

from transaction import get_transaction

import time

class RecoveryStorage(IteratorDeepCompare):
    # Requires a setUp() that creates a self._dst destination storage
    def checkSimpleRecovery(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=11)
        revid = self._dostore(oid, revid=revid, data=12)
        revid = self._dostore(oid, revid=revid, data=13)
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

    def checkRecoveryAcrossVersions(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=21)
        revid = self._dostore(oid, revid=revid, data=22)
        revid = self._dostore(oid, revid=revid, data=23, version='one')
        revid = self._dostore(oid, revid=revid, data=34, version='one')
        # Now commit the version
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.commitVersion('one', '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

    def checkRecoverAbortVersion(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=21, version="one")
        revid = self._dostore(oid, revid=revid, data=23, version='one')
        revid = self._dostore(oid, revid=revid, data=34, version='one')
        # Now abort the version and the creation
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion('one', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        self.assertEqual(oids, [oid])
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)
        # Also make sure the the last transaction has a data record
        # with None for its data attribute, because we've undone the
        # object.
        for s in self._storage, self._dst:
            iter = s.iterator()
            for trans in iter:
                pass # iterate until we get the last one
            data = trans[0]
            self.assertRaises(IndexError, lambda i, t=trans: t[i], 1)
            self.assertEqual(data.oid, oid)
            self.assertEqual(data.data, None)
    
    def checkRecoverUndoInVersion(self):
        oid = self._storage.new_oid()
        version = "aVersion"
        revid_a = self._dostore(oid, data=MinPO(91))
        revid_b = self._dostore(oid, revid=revid_a, version=version,
                                data=MinPO(92))
        revid_c = self._dostore(oid, revid=revid_b, version=version,
                                data=MinPO(93))
        self._undo(self._storage.undoInfo()[0]['id'], oid)
        self._commitVersion(version, '')
        self._undo(self._storage.undoInfo()[0]['id'], oid)

        # now copy the records to a new storage
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)

        # The last two transactions were applied directly rather than
        # copied.  So we can't use compare() to verify that they new
        # transactions are applied correctly.  (The new transactions
        # will have different timestamps for each storage.)

        self._abortVersion(version)
        self.assert_(self._storage.versionEmpty(version))
        self._undo(self._storage.undoInfo()[0]['id'], oid)
        self.assert_(not self._storage.versionEmpty(version))

        # check the data is what we expect it to be
        data, revid = self._storage.load(oid, version)
        self.assertEqual(zodb_unpickle(data), MinPO(92))
        data, revid = self._storage.load(oid, '')
        self.assertEqual(zodb_unpickle(data), MinPO(91))

        # and swap the storages
        tmp = self._storage
        self._storage = self._dst
        self._abortVersion(version)
        self.assert_(self._storage.versionEmpty(version))
        self._undo(self._storage.undoInfo()[0]['id'], oid)
        self.assert_(not self._storage.versionEmpty(version))

        # check the data is what we expect it to be
        data, revid = self._storage.load(oid, version)
        self.assertEqual(zodb_unpickle(data), MinPO(92))
        data, revid = self._storage.load(oid, '')
        self.assertEqual(zodb_unpickle(data), MinPO(91))

        # swap them back
        self._storage = tmp

        # Now remove _dst and copy all the transactions a second time.
        # This time we will be able to confirm via compare().
        self._dst.close()
        self._dst.cleanup()
        self._dst = self.new_dest()
        self._dst.copyTransactionsFrom(self._storage)
        self.compare(self._storage, self._dst)
        
    def checkRestoreAcrossPack(self):
        db = DB(self._storage)
        c = db.open()
        r = c.root()
        obj = r["obj1"] = MinPO(1)
        get_transaction().commit()
        obj = r["obj2"] = MinPO(1)
        get_transaction().commit()

        self._dst.copyTransactionsFrom(self._storage)
        self._dst.pack(time.time())

        self._undo(self._storage.undoInfo()[0]['id'])

        # copy the final transaction manually.  even though there
        # was a pack, the restore() ought to succeed.
        final = list(self._storage.iterator())[-1]
        self._dst.tpc_begin(final, final.tid, final.status)
        for r in final:
            self._dst.restore(r.oid, r.serial, r.data, r.version, r.data_txn,
                              final)
        self._dst.tpc_vote(final)
        self._dst.tpc_finish(final)


=== Added File Zope3/src/zodb/storage/tests/revision.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Check loadSerial() on storages that support historical revisions."""

from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle, zodb_pickle

ZERO = '\0'*8

class RevisionStorage:
    
    def checkLoadSerial(self):
        oid = self._storage.new_oid()
        revid = ZERO
        revisions = {}
        for i in range(31, 38):
            revid = self._dostore(oid, revid=revid, data=MinPO(i))
            revisions[revid] = MinPO(i)
        # Now make sure all the revisions have the correct value
        for revid, value in revisions.items():
            data = self._storage.loadSerial(oid, revid)
            self.assertEqual(zodb_unpickle(data), value)
    


=== Added File Zope3/src/zodb/storage/tests/speed.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
usage="""Test speed of a ZODB storage

Options:

    -d file    The data file to use as input.
               The default is this script.

    -n n       The number of repititions

    -s module  A module that defines a 'Storage'
               attribute, which is an open storage.
               If not specified, a FileStorage will ne
               used.

    -z         Test compressing data

    -D         Run in debug mode

    -L         Test loads as well as stores by minimizing
               the cache after eachrun

    -M         Output means only
"""
  
import sys, os, getopt, string, time
sys.path.insert(0, os.getcwd())

import ZODB.DB, ZODB.FileStorage
import persistence
from transaction import get_transaction

class P(Persistence.Persistent): pass

def main(args):

    opts, args = getopt.getopt(args, 'zd:n:Ds:LM')
    z=s=None
    data=sys.argv[0]
    nrep=5
    minimize=0
    detailed=1
    for o, v in opts:
        if o=='-n': nrep=string.atoi(v)
        elif o=='-d': data=v
        elif o=='-s': s=v
        elif o=='-z':
            global zlib
            import zlib
            z=compress
        elif o=='-L':
            minimize=1
        elif o=='-M':
            detailed=0
        elif o=='-D':
            global debug
            os.environ['STUPID_LOG_FILE']=''
            os.environ['STUPID_LOG_SEVERITY']='-999'

    if s:
        s=__import__(s, globals(), globals(), ('__doc__',))
        s=s.Storage
    else:
        s=ZODB.FileStorage.FileStorage('zeo_speed.fs', create=1)

    data=open(data).read()
    db=ZODB.DB.DB(s, cache_size=4000)
    results={1:0, 10:0, 100:0, 1000:0}
    for j in range(nrep):
        for r in 1, 10, 100, 1000:
            t=time.time()
            jar=db.open()
            get_transaction().begin()
            rt=jar.root()
            key='s%s' % r
            if rt.has_key(key): p=rt[key]
            else: rt[key]=p=P()
            for i in range(r):
                if z is not None: d=z(data)
                else: d=data
                v=getattr(p, str(i), P())
                v.d=d
                setattr(p,str(i),v)
            get_transaction().commit()
            jar.close()
            t=time.time()-t
            if detailed:
                sys.stderr.write("%s\t%s\t%.4f\n" % (j, r, t))
                sys.stdout.flush()
            results[r]=results[r]+t
            rt=d=p=v=None # release all references
            if minimize:
                time.sleep(3)
                jar.cacheMinimize(3)

    if detailed: print '-'*24
    for r in 1, 10, 100, 1000:
        t=results[r]/nrep
        sys.stderr.write("mean:\t%s\t%.4f\t%.4f (s/o)\n" % (r, t, t/r))

    db.close()
            
    
def compress(s):
    c=zlib.compressobj()
    o=c.compress(s)
    return o+c.flush()    

if __name__=='__main__': main(sys.argv[1:])


=== Added File Zope3/src/zodb/storage/tests/synchronization.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Test the storage's implemenetation of the storage synchronization spec.

The Synchronization spec
    http://www.zope.org/Documentation/Developer/Models/ZODB/
    ZODB_Architecture_Storage_Interface_State_Synchronization_Diag.html

It specifies two states committing and non-committing.  A storage
starts in the non-committing state.  tpc_begin() transfers to the
committting state; tpc_abort() and tpc_finish() transfer back to
non-committing.

Several other methods are only allowed in one state or another.  Many
methods allowed only in the committing state require that they apply
to the currently committing transaction.

The spec is silent on a variety of methods that don't appear to modify
the state, e.g. load(), undoLog(), pack().  It's unclear whether there
is a separate set of synchronization rules that apply to these methods
or if the synchronization is implementation dependent, i.e. only what
is need to guarantee a corrected implementation.

The synchronization spec is also silent on whether there is any
contract implied with the caller.  If the storage can assume that a
single client is single-threaded and that it will not call, e.g., store()
until after it calls tpc_begin(), the implementation can be
substantially simplified.

New and/or unspecified methods:

tpc_vote(): handled like tpc_abort
transactionalUndo(): handled like undo()  (which is how?)

Methods that have nothing to do with committing/non-committing:
load(), loadSerial(), getName(), getSize(), __len__(), history(),
undoLog(), modifiedInVersion(), versionEmpty(), versions(), pack().

Specific questions:

The spec & docs say that undo() takes three arguments, the second
being a transaction.  If the specified arg isn't the current
transaction, the undo() should raise StorageTransactionError.  This
isn't implemented anywhere.  It looks like undo can be called at
anytime.

FileStorage does not allow undo() during a pack.  How should this be
tested?  Is it a general restriction?



"""

from zodb.ztransaction import Transaction
from zodb.interfaces import StorageTransactionError

VERSION = "testversion"
OID = "\000" * 8
SERIALNO = "\000" * 8
TID = "\000" * 8

class SynchronizedStorage:

##    def verifyCommitting(self, callable, *args):
##        self.assertRaises(StorageTransactionError, callable *args)
    
    def verifyNotCommitting(self, callable, *args):
        args = (StorageTransactionError, callable) + args
        apply(self.assertRaises, args)

    def verifyWrongTrans(self, callable, *args):
        t = Transaction()
        self._storage.tpc_begin(t)
        self.assertRaises(StorageTransactionError, callable, *args)
        self._storage.tpc_abort(t)

    def checkAbortVersionNotCommitting(self):
        self.verifyNotCommitting(self._storage.abortVersion,
                                 VERSION, Transaction())

    def checkAbortVersionWrongTrans(self):
        self.verifyWrongTrans(self._storage.abortVersion,
                              VERSION, Transaction())

    def checkCommitVersionNotCommitting(self):
        self.verifyNotCommitting(self._storage.commitVersion,
                                 VERSION, "", Transaction())

    def checkCommitVersionWrongTrans(self):
        self.verifyWrongTrans(self._storage.commitVersion,
                              VERSION, "", Transaction())


    def checkStoreNotCommitting(self):
        self.verifyNotCommitting(self._storage.store,
                                 OID, SERIALNO, "", "", Transaction())
    
    def checkStoreWrongTrans(self):
        self.verifyWrongTrans(self._storage.store,
                              OID, SERIALNO, "", "", Transaction())

##    def checkNewOidNotCommitting(self):
##        self.verifyNotCommitting(self._storage.new_oid)
    
##    def checkNewOidWrongTrans(self):
##        self.verifyWrongTrans(self._storage.new_oid)
    

    def checkAbortNotCommitting(self):
        self._storage.tpc_abort(Transaction())

    def checkAbortWrongTrans(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.tpc_abort(Transaction())
        self._storage.tpc_abort(t)

    def checkFinishNotCommitting(self):
        t = Transaction()
        self._storage.tpc_finish(t)
        self._storage.tpc_abort(t)

    def checkFinishWrongTrans(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.tpc_finish(Transaction())
        self._storage.tpc_abort(t)
    
    def checkBeginCommitting(self):
        t = Transaction()
        self._storage.tpc_begin(t)
        self._storage.tpc_begin(t)
        self._storage.tpc_abort(t)

    # XXX how to check undo?


=== Added File Zope3/src/zodb/storage/tests/test_autopack.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import os
import time
import unittest

from zodb.db import DB
from zodb.storage.tests.minpo import MinPO
from persistence import Persistent
from transaction import get_transaction

from zodb.storage.bdbfull import BDBFullStorage
from zodb.storage.bdbminimal import BDBMinimalStorage
from zodb.storage.base import BerkeleyConfig
from zodb.storage.tests.base import BerkeleyTestBase

ZERO = '\0'*8

class C(Persistent):
    pass



class TestAutopackBase(BerkeleyTestBase):
    def _config(self):
        config = BerkeleyConfig()
        # Autopack every 3 seconds, 6 seconds into the past, no classic packs
        config.frequency = 3
        config.packtime = 6
        config.classicpack = 0
        return config

    def _wait_for_next_autopack(self):
        storage = self._storage
        # BAW: this uses a non-public interface
        packtime = storage._autopacker._nextcheck
        while packtime == storage._autopacker._nextcheck:
            time.sleep(1)

    def _mk_dbhome(self, dir):
        # Create the storage
        os.mkdir(dir)
        try:
            return self.ConcreteStorage(dir, config=self._config())
        except:
            self._zap_dbhome(dir)
            raise


class TestAutopack(TestAutopackBase):
    ConcreteStorage = BDBFullStorage

    def checkAutopack(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        # Wait for an autopack operation to occur, then make three revisions
        # to an object.  Wait for the next autopack operation and make sure
        # all three revisions still exist.  Then sleep 10 seconds and wait for
        # another autopack operation.  Then verify that the first two
        # revisions have been packed away.
        oid = storage.new_oid()
        self._wait_for_next_autopack()
        revid1 = self._dostore(oid, data=MinPO(2112))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
        self._wait_for_next_autopack()
        unless(storage.loadSerial(oid, revid1))
        unless(storage.loadSerial(oid, revid2))
        unless(storage.loadSerial(oid, revid3))
        # Should be enough time for the revisions to get packed away
        time.sleep(10)
        self._wait_for_next_autopack()
        # The first two revisions should now be gone, but the third should
        # still exist because it's the current revision, and we haven't done a
        # classic pack.
        raises(KeyError, self._storage.loadSerial, oid, revid1)
        raises(KeyError, self._storage.loadSerial, oid, revid2)
        unless(storage.loadSerial(oid, revid3))



class TestAutomaticClassicPack(TestAutopackBase):
    ConcreteStorage = BDBFullStorage

    def _config(self):
        config = BerkeleyConfig()
        # Autopack every 3 seconds, 6 seconds into the past, no classic packs
        config.frequency = 3
        config.packtime = 6
        config.classicpack = 1
        return config

    def checkAutomaticClassicPack(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        # Wait for an autopack operation to occur, then make three revisions
        # to an object.  Wait for the next autopack operation and make sure
        # all three revisions still exist.  Then sleep 10 seconds and wait for
        # another autopack operation.  Then verify that the first two
        # revisions have been packed away.
        oid = storage.new_oid()
        self._wait_for_next_autopack()
        revid1 = self._dostore(oid, data=MinPO(2112))
        revid2 = self._dostore(oid, revid=revid1, data=MinPO(2113))
        revid3 = self._dostore(oid, revid=revid2, data=MinPO(2114))
        self._wait_for_next_autopack()
        unless(storage.loadSerial(oid, revid1))
        unless(storage.loadSerial(oid, revid2))
        unless(storage.loadSerial(oid, revid3))
        # Should be enough time for the revisions to get packed away
        time.sleep(10)
        self._wait_for_next_autopack()
        # The first two revisions should now be gone, but the third should
        # still exist because it's the current revision, and we haven't done a
        # classic pack.
        raises(KeyError, storage.loadSerial, oid, revid1)
        raises(KeyError, storage.loadSerial, oid, revid2)
        raises(KeyError, storage.loadSerial, oid, revid3)

    def checkCycleUnreachable(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        db = DB(storage)
        conn = db.open()
        root = conn.root()
        self._wait_for_next_autopack()
        # Store an object that's reachable from the root
        obj1 = C()
        obj2 = C()
        obj1.obj = obj2
        obj2.obj = obj1
        root.obj = obj1
        txn = get_transaction()
        txn.note('root -> obj1 <-> obj2')
        txn.commit()
        oid1 = obj1._p_oid
        oid2 = obj2._p_oid
        assert oid1 and oid2 and oid1 <> oid2
        self._wait_for_next_autopack()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # Now unlink it, which should still leave obj1 and obj2 alive
        del root.obj
        txn = get_transaction()
        txn.note('root -X-> obj1 <-> obj2')
        txn.commit()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # Do an explicit full pack to right now to collect all the old
        # revisions and the cycle.
        storage.pack(time.time())
        # And it should be packed away
        unless(storage.load(ZERO, ''))
        raises(KeyError, storage.load, oid1, '')
        raises(KeyError, storage.load, oid2, '')



class TestMinimalPack(TestAutopackBase):
    ConcreteStorage = BDBMinimalStorage

    def _config(self):
        config = BerkeleyConfig()
        # Autopack every 3 seconds
        config.frequency = 3
        return config

    def checkRootUnreachable(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        db = DB(storage)
        conn = db.open()
        root = conn.root()
        self._wait_for_next_autopack()
        # Store an object that's reachable from the root
        obj = C()
        obj.value = 999
        root.obj = obj
        txn = get_transaction()
        txn.note('root -> obj')
        txn.commit()
        oid = obj._p_oid
        assert oid
        self._wait_for_next_autopack()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid, ''))
        # Now unlink it
        del root.obj
        txn = get_transaction()
        txn.note('root -X-> obj')
        txn.commit()
        # The object should be gone due to reference counting
        unless(storage.load(ZERO, ''))
        raises(KeyError, storage.load, oid, '')

    def checkCycleUnreachable(self):
        unless = self.failUnless
        raises = self.assertRaises
        storage = self._storage
        db = DB(storage)
        conn = db.open()
        root = conn.root()
        self._wait_for_next_autopack()
        # Store an object that's reachable from the root
        obj1 = C()
        obj2 = C()
        obj1.obj = obj2
        obj2.obj = obj1
        root.obj = obj1
        txn = get_transaction()
        txn.note('root -> obj1 <-> obj2')
        txn.commit()
        oid1 = obj1._p_oid
        oid2 = obj2._p_oid
        assert oid1 and oid2 and oid1 <> oid2
        self._wait_for_next_autopack()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # Now unlink it, which should still leave obj1 and obj2 alive
        del root.obj
        txn = get_transaction()
        txn.note('root -X-> obj1 <-> obj2')
        txn.commit()
        unless(storage.load(ZERO, ''))
        unless(storage.load(oid1, ''))
        unless(storage.load(oid2, ''))
        # But the next autopack should collect both obj1 and obj2
        self._wait_for_next_autopack()
        # And it should be packed away
        unless(storage.load(ZERO, ''))
        raises(KeyError, storage.load, oid1, '')
        raises(KeyError, storage.load, oid2, '')



def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestAutopack, 'check'))
    suite.addTest(unittest.makeSuite(TestAutomaticClassicPack, 'check'))
    suite.addTest(unittest.makeSuite(TestMinimalPack, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/test_config.py ===
import os
import shutil
import tempfile
import unittest
from StringIO import StringIO

import ZConfig

from zodb import StorageConfig

class StorageTestCase(unittest.TestCase):

    def setUp(self):
        unittest.TestCase.setUp(self)
        self.tmpfn = tempfile.mktemp()
        self.storage = None

    def tearDown(self):
        unittest.TestCase.tearDown(self)
        storage = self.storage
        self.storage = None
        try:
            if storage is not None:
                storage.close()
        except:
            pass
        try:
            # Full storage creates a directory
            if os.path.isdir(self.tmpfn):
                shutil.rmtree(self.tmpfn)
            else:
                os.remove(self.tmpfn)
        except os.error:
            pass

    def testFileStorage(self):
        from zodb.storage.file import FileStorage
        sample = """
        <Storage>
        type       FileStorage
        file_name  %s
        create     yes
        </Storage>
        """ % self.tmpfn
        io = StringIO(sample)
        rootconf = ZConfig.loadfile(io)
        storageconf = rootconf.getSection("Storage")
        cls, args = StorageConfig.getStorageInfo(storageconf)
        self.assertEqual(cls, FileStorage)
        self.assertEqual(args, {"file_name": self.tmpfn, "create": 1})
        self.storage = StorageConfig.createStorage(storageconf)
        self.assert_(isinstance(self.storage, FileStorage))

    def testZEOStorage(self):
        try:
            from zodb.zeo.client import ClientStorage
        except ImportError:
            return
        sample = """
        <Storage>
        type       ClientStorage
        addr       zeo://www.python.org:9001
        wait       no
        </Storage>
        """
        io = StringIO(sample)
        rootconf = ZConfig.loadfile(io)
        storageconf = rootconf.getSection("Storage")
        cls, args = StorageConfig.getStorageInfo(storageconf)
        self.assertEqual(cls, ClientStorage)
        self.assertEqual(args, {"addr": [("www.python.org", 9001)], "wait": 0})
        self.storage = StorageConfig.createStorage(storageconf)
        self.assert_(isinstance(self.storage, ClientStorage))

    def testMappingStorage(self):
        from zodb.storage.mapping import MappingStorage
        sample = """
        <Storage>
        type       MappingStorage
        </Storage>
        """
        io = StringIO(sample)
        rootconf = ZConfig.loadfile(io)
        storageconf = rootconf.getSection("Storage")
        cls, args = StorageConfig.getStorageInfo(storageconf)
        self.assertEqual(cls, MappingStorage)
        self.assertEqual(args, {})
        self.storage = StorageConfig.createStorage(storageconf)
        self.assert_(isinstance(self.storage, MappingStorage))

    def testModuleStorage(self):
        # Test explicit module+class
        from zodb.storage.mapping import MappingStorage
        sample = """
        <Storage>
        type       ZODB.MappingStorage.MappingStorage
        </Storage>
        """
        io = StringIO(sample)
        rootconf = ZConfig.loadfile(io)
        storageconf = rootconf.getSection("Storage")
        cls, args = StorageConfig.getStorageInfo(storageconf)
        self.assertEqual(cls, MappingStorage)
        self.assertEqual(args, {})
        self.storage = StorageConfig.createStorage(storageconf)
        self.assert_(isinstance(self.storage, MappingStorage))

    def testFullStorage(self):
        try:
            from zodb.storage.bdbfull import BDBFullStorage
        except ImportError:
##            import warnings
##            warnings.warn('No BDBStorage available', RuntimeWarning)
            return
        sample = """
        <Storage>
        type       BDBFullStorage
        name       %s
        cachesize  1000
        </Storage>
        """ % self.tmpfn
        os.mkdir(self.tmpfn)
        io = StringIO(sample)
        rootconf = ZConfig.loadfile(io)
        storageconf = rootconf.getSection("Storage")
        cls, args = StorageConfig.getStorageInfo(storageconf)
        self.assertEqual(cls, BDBFullStorage)
        # It's too hard to test the config instance equality
        args = args.copy()
        del args['config']
        self.assertEqual(args, {"name": self.tmpfn})
        self.storage = StorageConfig.createStorage(storageconf)
        self.assert_(isinstance(self.storage, BDBFullStorage))
        # XXX _config isn't public
        self.assert_(self.storage._config.cachesize, 1000)

    def testMinimalStorage(self):
        try:
            from zodb.storage.bdbminimal import BDBMinimalStorage
        except ImportError:
##            import warnings
##            warnings.warn('No BDBStorage available', RuntimeWarning)
            return
        sample = """
        <Storage>
        type       BDBMinimalStorage
        name       %s
        cachesize  1000
        </Storage>
        """ % self.tmpfn
        os.mkdir(self.tmpfn)
        io = StringIO(sample)
        rootconf = ZConfig.loadfile(io)
        storageconf = rootconf.getSection("Storage")
        cls, args = StorageConfig.getStorageInfo(storageconf)
        self.assertEqual(cls, BDBMinimalStorage)
        # It's too hard to test the config instance equality
        args = args.copy()
        del args['config']
        self.assertEqual(args, {"name": self.tmpfn})
        self.storage = StorageConfig.createStorage(storageconf)
        self.assert_(isinstance(self.storage, BDBMinimalStorage))
        # XXX _config isn't public
        self.assert_(self.storage._config.cachesize, 1000)

def test_suite():
    return unittest.makeSuite(StorageTestCase)

if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/test_create.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

# Unit test for database creation

import os
import time
import unittest

from zodb.storage.base import BerkeleyConfig
from BDBStorage.tests import BerkeleyTestBase
from zodb.storage.bdbfull import BDBFullStorage



class TestMixin:
    def checkDBHomeExists(self):
        self.failUnless(os.path.isdir(BerkeleyTestBase.DBHOME))


class MinimalCreateTest(BerkeleyTestBase.MinimalTestBase, TestMixin):
    pass


class FullCreateTest(BerkeleyTestBase.FullTestBase, TestMixin):
    pass



class FullOpenExistingTest(BerkeleyTestBase.FullTestBase):
    def checkOpenWithExistingVersions(self):
        version = 'test-version'
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=7, version=version)
        # Now close the current storage and re-open it
        self._storage.close()
        self._storage = self.ConcreteStorage(BerkeleyTestBase.DBHOME)
        self.assertEqual(self._storage.modifiedInVersion(oid), version)

    def checkOpenAddVersion(self):
        eq = self.assertEqual
        version1 = 'test-version'
        oid1 = self._storage.new_oid()
        revid = self._dostore(oid1, data=7, version=version1)
        # Now close the current storage and re-open it
        self._storage.close()
        self._storage = self.ConcreteStorage(BerkeleyTestBase.DBHOME)
        eq(self._storage.modifiedInVersion(oid1), version1)
        # Now create a 2nd version string, then close/reopen
        version2 = 'new-version'
        oid2 = self._storage.new_oid()
        revid = self._dostore(oid2, data=8, version=version2)
        # Now close the current storage and re-open it
        self._storage.close()
        self._storage = self.ConcreteStorage(BerkeleyTestBase.DBHOME)
        eq(self._storage.modifiedInVersion(oid1), version1)
        # Now create a 2nd version string, then close/reopen
        eq(self._storage.modifiedInVersion(oid2), version2)



class FullOpenCloseTest(BerkeleyTestBase.FullTestBase):
    def _mk_dbhome(self, dir):
        config = BerkeleyConfig
        config.interval = 10
        os.mkdir(dir)
        try:
            return self.ConcreteStorage(dir, config=config)
        except:
            self._zap_dbhome(dir)
            raise

    def checkCloseWithCheckpointingThread(self):
        # All the interesting stuff happens in the setUp and tearDown
        time.sleep(20)



class OpenRecoveryTest(BerkeleyTestBase.FullTestBase):
    def _mk_dbhome(self, dir):
        self._dir = dir

    def checkOpenWithBogusConfig(self):
        class C: pass
        c = C()
        # This instance won't have the necessary attributes, so the creation
        # will fail.  We want to be sure that everything gets cleaned up
        # enough to fix that and create a proper storage.
        self.assertRaises(AttributeError, BDBFullStorage, self._dir, config=c)
        c = BerkeleyConfig()
        s = BDBFullStorage(self._dir, config=c)
        s.close()



def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(MinimalCreateTest, 'check'))
    suite.addTest(unittest.makeSuite(FullCreateTest, 'check'))
    suite.addTest(unittest.makeSuite(FullOpenExistingTest, 'check'))
    suite.addTest(unittest.makeSuite(FullOpenCloseTest, 'check'))
    suite.addTest(unittest.makeSuite(OpenRecoveryTest, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/test_file.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import zodb.storage.file
import sys, os, unittest
import errno
from zodb.ztransaction import Transaction
from zodb import POSException

from zodb.storage.tests import StorageTestBase, BasicStorage, \
     TransactionalUndoStorage, VersionStorage, \
     TransactionalUndoVersionStorage, PackableStorage, \
     Synchronization, ConflictResolution, HistoryStorage, \
     IteratorStorage, Corruption, RevisionStorage, PersistentStorage, \
     MTStorage, ReadOnlyStorage, RecoveryStorage
from zodb.storage.tests.base import MinPO, zodb_unpickle

class FileStorageTests(
    StorageTestBase.StorageTestBase,
    BasicStorage.BasicStorage,
    TransactionalUndoStorage.TransactionalUndoStorage,
    RevisionStorage.RevisionStorage,
    VersionStorage.VersionStorage,
    TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
    PackableStorage.PackableStorage,
    Synchronization.SynchronizedStorage,
    ConflictResolution.ConflictResolvingStorage,
    ConflictResolution.ConflictResolvingTransUndoStorage,
    HistoryStorage.HistoryStorage,
    IteratorStorage.IteratorStorage,
    IteratorStorage.ExtendedIteratorStorage,
    PersistentStorage.PersistentStorage,
    MTStorage.MTStorage,
    ReadOnlyStorage.ReadOnlyStorage
    ):

    def open(self, **kwargs):
        self._storage = ZODB.FileStorage.FileStorage('FileStorageTests.fs',
                                                     **kwargs)

    def setUp(self):
        StorageTestBase.removefs("FileStorageTests.fs")
        self.open(create=1)

    def tearDown(self):
        self._storage.close()
        StorageTestBase.removefs("FileStorageTests.fs")

    def checkLongMetadata(self):
        s = "X" * 75000
        try:
            self._dostore(user=s)
        except POSException.StorageError:
            pass
        else:
            self.fail("expect long user field to raise error")
        try:
            self._dostore(description=s)
        except POSException.StorageError:
            pass
        else:
            self.fail("expect long user field to raise error")

class FileStorageRecoveryTest(
    StorageTestBase.StorageTestBase,
    RecoveryStorage.RecoveryStorage,
    ):

    def setUp(self):
        StorageTestBase.removefs("Source.fs")
        StorageTestBase.removefs("Dest.fs")
        self._storage = ZODB.FileStorage.FileStorage('Source.fs')
        self._dst = ZODB.FileStorage.FileStorage('Dest.fs')

    def tearDown(self):
        self._storage.close()
        self._dst.close()
        StorageTestBase.removefs("Source.fs")
        StorageTestBase.removefs("Dest.fs")

    def new_dest(self):
        StorageTestBase.removefs('Dest.fs')
        return ZODB.FileStorage.FileStorage('Dest.fs')

def test_suite():
    suite = unittest.makeSuite(FileStorageTests, 'check')
    suite2 = unittest.makeSuite(Corruption.FileStorageCorruptTests, 'check')
    suite3 = unittest.makeSuite(FileStorageRecoveryTest, 'check')
    suite.addTest(suite2)
    suite.addTest(suite3)
    return suite

def main():
    alltests=test_suite()
    runner = unittest.TextTestRunner()
    runner.run(alltests)



=== Added File Zope3/src/zodb/storage/tests/test_fsindex.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################

import unittest, sys
from zodb.storage.fsindex import fsIndex
from zodb.utils import p64


class Test(unittest.TestCase):

    def testInserts(self):
        index=fsIndex()

        for i in range(200):
            index[p64(i*1000)]=(i*1000L+1)

        for i in range(0,200):
            self.assertEqual((i,index[p64(i*1000)]), (i,(i*1000L+1)))
            
        self.assertEqual(len(index), 200)

        key=p64(2000)

        self.assertEqual(index.get(key), 2001)

        key=p64(2001)
        self.assertEqual(index.get(key), None)
        self.assertEqual(index.get(key, ''), '')

        # self.failUnless(len(index._data) > 1)

    def testUpdate(self):
        index=fsIndex()
        d={}

        for i in range(200):
            d[p64(i*1000)]=(i*1000L+1)

        index.update(d)

        for i in range(400,600):
            d[p64(i*1000)]=(i*1000L+1)
        
        index.update(d)

        for i in range(100, 500):
            d[p64(i*1000)]=(i*1000L+2)
            
        index.update(d)

        self.assertEqual(index.get(p64(2000)), 2001)
        self.assertEqual(index.get(p64(599000)), 599001)
        self.assertEqual(index.get(p64(399000)), 399002)
        self.assertEqual(len(index), 600)


def test_suite():
    loader=unittest.TestLoader()
    return loader.loadTestsFromTestCase(Test)

if __name__=='__main__':
    unittest.TextTestRunner().run(test_suite())


=== Added File Zope3/src/zodb/storage/tests/test_mapping.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
import zodb.storage.mapping
import os, unittest

from zodb.storage.tests import StorageTestBase, BasicStorage, Synchronization

class MappingStorageTests(StorageTestBase.StorageTestBase,
                       BasicStorage.BasicStorage,
                       Synchronization.SynchronizedStorage,
                       ):

    def setUp(self):
        self._storage = ZODB.MappingStorage.MappingStorage()

    def tearDown(self):
        self._storage.close()

def test_suite():
    suite = unittest.makeSuite(MappingStorageTests, 'check')
    return suite

if __name__ == "__main__":
    loader = unittest.TestLoader()
    loader.testMethodPrefix = "check"
    unittest.main(testLoader=loader)
    


=== Added File Zope3/src/zodb/storage/tests/test_storage_api.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

# Unit tests for basic storage functionality

import unittest
# Import this here and now so that import failures properly cause the test
# suite to ignore these tests.
import bsddb3

from zodb import POSException

import zodb.storage.tests.base
from zodb.storage.tests.basic import BasicStorage
from zodb.storage.tests.revision import RevisionStorage
from zodb.storage.tests.version import VersionStorage
from zodb.storage.tests.undo import TransactionalUndoStorage
from zodb.storage.tests.undoversion import \
     TransactionalUndoVersionStorage
from zodb.storage.tests.packable import PackableStorage
from zodb.storage.tests.history import HistoryStorage
from zodb.storage.tests.iterator import IteratorStorage, ExtendedIteratorStorage
from zodb.storage.tests.recovery import RecoveryStorage
from zodb.storage.tests import ConflictResolution



class MinimalTest(BerkeleyTestBase.MinimalTestBase, BasicStorage):
    def checkVersionedStoreAndLoad(self):
        # This storage doesn't support versions, so we should get an exception
        oid = self._storage.new_oid()
        self.assertRaises(POSException.Unsupported,
                          self._dostore,
                          oid, data=11, version='a version')


class FullTest(BerkeleyTestBase.FullTestBase, BasicStorage,
               RevisionStorage, VersionStorage,
               TransactionalUndoStorage,
               TransactionalUndoVersionStorage,
               PackableStorage,
               HistoryStorage,
               IteratorStorage, ExtendedIteratorStorage,
               ConflictResolution.ConflictResolvingStorage,
               ConflictResolution.ConflictResolvingTransUndoStorage):
    pass



DST_DBHOME = 'test-dst'

class FullRecoveryTest(BerkeleyTestBase.FullTestBase,
                       RecoveryStorage):
    def setUp(self):
        BerkeleyTestBase.FullTestBase.setUp(self)
        self._zap_dbhome(DST_DBHOME)
        self._dst = self._mk_dbhome(DST_DBHOME)

    def tearDown(self):
        BerkeleyTestBase.FullTestBase.tearDown(self)
        self._zap_dbhome(DST_DBHOME)

    def new_dest(self):
        self._zap_dbhome(DST_DBHOME)
        return self._mk_dbhome(DST_DBHOME)



def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(FullTest, 'check'))
    suite.addTest(unittest.makeSuite(FullRecoveryTest, 'check'))
    suite.addTest(unittest.makeSuite(MinimalTest, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/test_virgin.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################

# Test creation of a brand new database, and insertion of root objects.

import unittest

from zodb.storage.tests.base import ZODBTestBase
from transaction import get_transaction
from persistence.dict import PersistentDict
        


class InsertMixin:
    def checkIsEmpty(self):
        self.failUnless(not self._root.has_key('names'))

    def checkNewInserts(self):
        self._root['names'] = names = PersistentDict()
        names['Warsaw'] = 'Barry'
        names['Hylton'] = 'Jeremy'
        get_transaction().commit()



class FullNewInsertsTest(ZODBTestBase, InsertMixin):
    from zodb.storage.bdbfull import BDBFullStorage
    ConcreteStorage = BDBFullStorage


class MinimalNewInsertsTest(ZODBTestBase, InsertMixin):
    from zodb.storage.bdbminimal import BDBMinimalStorage
    ConcreteStorage = BDBMinimalStorage



def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(MinimalNewInsertsTest, 'check'))
    suite.addTest(unittest.makeSuite(FullNewInsertsTest, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/test_whitebox.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

# Whitebox testing of storage implementation details.

import unittest

from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle
from zodb.storage.bdbfull import BDBFullStorage
from zodb.storage.bdbminimal import BDBMinimalStorage
from zodb.storage.tests.base import BerkeleyTestBase
from zodb.storage.tests.base import ZODBTestBase

from persistence import Persistent
from transaction import get_transaction

ZERO = '\0'*8



class Object(Persistent):
    pass



class WhiteboxLowLevelMinimal(BerkeleyTestBase):
    ConcreteStorage = BDBMinimalStorage

    def checkTableConsistencyAfterCommit(self):
        unless = self.failIf
        eq = self.assertEqual
        oid = self._storage.new_oid()
        revid1 = self._dostore(oid, data=11)
        revid2 = self._dostore(oid, revid=revid1, data=12)
        revid3 = self._dostore(oid, revid=revid2, data=13)
        # First off, there should be no entries in the pending table
        unless(self._storage._pending.keys())
        # Also, there should be no entries in the oids table
        unless(self._storage._oids.keys())
        # Now, there should be exactly one oid in the serials table, and
        # exactly one record for that oid in the table too.
        oids = {}
        c = self._storage._serials.cursor()
        try:
            rec = c.first()
            while rec:
                oid, serial = rec
                oids.setdefault(oid, []).append(serial)
                rec = c.next()
        finally:
            c.close()
        eq(len(oids), 1)
        eq(len(oids[oids.keys()[0]]), 1)
        # There should now be exactly one entry in the pickles table.
        pickles = self._storage._pickles.items()
        eq(len(pickles), 1)
        key, data = pickles[0]
        poid = key[:8]
        pserial = key[8:]
        eq(oid, poid)
        eq(revid3, pserial)
        obj = zodb_unpickle(data)
        eq(obj.value, 13)
        # Now verify the refcounts table, which should be empty because the
        # stored object isn't referenced by any other objects.
        eq(len(self._storage._refcounts.keys()), 0)



class WhiteboxHighLevelMinimal(ZODBTestBase):
    ConcreteStorage = BDBMinimalStorage

    def checkReferenceCounting(self):
        eq = self.assertEqual
        obj = MinPO(11)
        self._root.obj = obj
        get_transaction().commit()
        obj.value = 12
        get_transaction().commit()
        obj.value = 13
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 2)
        # And now refcount out the object
        del self._root.obj
        get_transaction().commit()
        # Verification stage.  Our serials table should have exactly one
        # entry, oid == 0
        keys = self._storage._serials.keys()
        eq(len(keys), 1)
        eq(len(self._storage._serials.items()), 1)
        eq(keys[0], ZERO)
        # The pickles table now should have exactly one revision of the root
        # object, and no revisions of the MinPO object, which should have been
        # collected away.
        pickles = self._storage._pickles.items()
        eq(len(pickles), 1)
        rec = pickles[0]
        key = rec[0]
        data = rec[1]
        eq(key[:8], ZERO)
        # And that pickle should have no 'obj' attribute.
        unobj = zodb_unpickle(data)
        self.failIf(hasattr(unobj, 'obj'))
        # Our refcounts table should have no entries in it, because the root
        # object is an island.
        eq(len(self._storage._refcounts.keys()), 0)
        # And of course, oids and pendings should be empty too
        eq(len(self._storage._oids.keys()), 0)
        eq(len(self._storage._pending.keys()), 0)

    def checkRecursiveReferenceCounting(self):
        eq = self.assertEqual
        obj1 = Object()
        obj2 = Object()
        obj3 = Object()
        obj4 = Object()
        self._root.obj = obj1
        obj1.obj = obj2
        obj2.obj = obj3
        obj3.obj = obj4
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 5)
        eq(len(self._storage._pickles.items()), 5)
        # And now refcount out the object
        del self._root.obj
        get_transaction().commit()
        # Verification stage.  Our serials table should have exactly one
        # entry, oid == 0
        keys = self._storage._serials.keys()
        eq(len(keys), 1)
        eq(len(self._storage._serials.items()), 1)
        eq(keys[0], ZERO)
        # The pickles table now should have exactly one revision of the root
        # object, and no revisions of any other objects, which should have
        # been collected away.
        pickles = self._storage._pickles.items()
        eq(len(pickles), 1)
        rec = pickles[0]
        key = rec[0]
        data = rec[1]
        eq(key[:8], ZERO)
        # And that pickle should have no 'obj' attribute.
        unobj = zodb_unpickle(data)
        self.failIf(hasattr(unobj, 'obj'))
        # Our refcounts table should have no entries in it, because the root
        # object is an island.
        eq(len(self._storage._refcounts.keys()), 0)
        # And of course, oids and pendings should be empty too
        eq(len(self._storage._oids.keys()), 0)
        eq(len(self._storage._pending.keys()), 0)



class WhiteboxHighLevelFull(ZODBTestBase):
    ConcreteStorage = BDBFullStorage

    def checkReferenceCounting(self):
        eq = self.assertEqual
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 1)
        eq(len(self._storage._pickles.items()), 1)
        # Now store an object
        obj = MinPO(11)
        self._root.obj = obj
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 3)
        obj.value = 12
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 4)
        obj.value = 13
        get_transaction().commit()
        # Make sure the databases have what we expect
        eq(len(self._storage._serials.items()), 2)
        eq(len(self._storage._pickles.items()), 5)
        # And now refcount out the object
        del self._root.obj
        get_transaction().commit()
        # Verification stage.  Our serials tabl should still have 2 entries,
        # one for the root object and one for the now unlinked MinPO obj.
        keys = self._storage._serials.keys()
        eq(len(keys), 2)
        eq(len(self._storage._serials.items()), 2)
        eq(keys[0], ZERO)
        # The pickles table should now have 6 entries, broken down like so:
        # - 3 revisions of the root object: the initial database-open
        #   revision, the revision that got its obj attribute set, and the
        #   revision that got its obj attribute deleted.
        # - 3 Three revisions of obj, corresponding to values 11, 12, and 13
        pickles = self._storage._pickles.items()
        eq(len(pickles), 6)
        # Our refcounts table should have one entry in it for the MinPO that's
        # referenced in an earlier revision of the root object
        eq(len(self._storage._refcounts.keys()), 1)
        # And of course, oids and pendings should be empty too
        eq(len(self._storage._oids.keys()), 0)
        eq(len(self._storage._pending.keys()), 0)



def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(WhiteboxLowLevelMinimal, 'check'))
    suite.addTest(unittest.makeSuite(WhiteboxHighLevelMinimal, 'check'))
    suite.addTest(unittest.makeSuite(WhiteboxHighLevelFull, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/test_zodb_simple.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################

# Test some simple ZODB level stuff common to both the Minimal and Full
# storages, like transaction aborts and commits, changing objects, etc.
# Doesn't test undo, versions, or packing.

import time
import unittest
# Import this here and now so that import failures properly cause the test
# suite to ignore these tests.
import bsddb3

from zodb.storage.tests.base import ZODBTestBase
from transaction import get_transaction
from persistence.dict import PersistentDict



class CommitAndRead:
    def checkCommit(self):
        self.failUnless(not self._root)
        names = self._root['names'] = PersistentDict()
        names['Warsaw'] = 'Barry'
        names['Hylton'] = 'Jeremy'
        get_transaction().commit()

    def checkReadAfterCommit(self):
        eq = self.assertEqual
        self.checkCommit()
        names = self._root['names']
        eq(names['Warsaw'], 'Barry')
        eq(names['Hylton'], 'Jeremy')
        self.failUnless(names.get('Drake') is None)

    def checkAbortAfterRead(self):
        self.checkReadAfterCommit()
        names = self._root['names']
        names['Drake'] = 'Fred'
        get_transaction().abort()

    def checkReadAfterAbort(self):
        self.checkAbortAfterRead()
        names = self._root['names']
        self.failUnless(names.get('Drake') is None)

    def checkChangingCommits(self):
        self.checkReadAfterAbort()
        now = time.time()
        # Make sure the last timestamp was more than 3 seconds ago
        timestamp = self._root.get('timestamp')
        if timestamp is None:
            timestamp = self._root['timestamp'] = 0
            get_transaction().commit()
        self.failUnless(now > timestamp + 3)
        self._root['timestamp'] = now
        time.sleep(3)



class MinimalCommitAndRead(ZODBTestBase, CommitAndRead):
    from zodb.storage.bdbminimal import BDBMinimalStorage
    ConcreteStorage = BDBMinimalStorage


class FullCommitAndRead(ZODBTestBase, CommitAndRead):
    from zodb.storage.bdbfull import BDBFullStorage
    ConcreteStorage = BDBFullStorage



def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(MinimalCommitAndRead, 'check'))
    suite.addTest(unittest.makeSuite(FullCommitAndRead, 'check'))
    return suite



if __name__ == '__main__':
    unittest.main(defaultTest='test_suite')


=== Added File Zope3/src/zodb/storage/tests/timeiter.py ===
#! /usr/bin/env python

##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################

"""Time transaction commits and normalize vs. pickle size and #objects.

Actually just counts the size of pickles in the transaction via the iterator
protocol, so storage overheads aren't counted.

Usage: %(PROGRAM)s [options]
Options:
    -h/--help
        Print this message and exit.

    -s filename
    --source=filename
        Use database in filename as the source (must be a FileStorage)

    -d filename
    --dest=filename
        Use database in filename as the destination (must be a BDB storage)

    -o filename
    --output=filename
        Print results in filename, otherwise stdout.

    -m txncount
    --max=txncount
        Stop after committing txncount transactions.

    -k txncount
    --skip=txncount
        Skip the first txncount transactions.

    -p/--profile
        Turn on specialized profiling.

    -q/--quiet
        Be quite.
"""

import sys
import os
import getopt
import time
import errno
import profile
import traceback
import marshal

from bsddb3 import db

from zodb import utils
from zodb.timestamp import TimeStamp
from zodb.storage.file import FileStorage
from zodb.storage.bdbfull import BDBFullStorage

PROGRAM = sys.argv[0]
ZERO = '\0'*8



def usage(code, msg=''):
    print >> sys.stderr, __doc__ % globals()
    if msg:
        print >> sys.stderr, msg
    sys.exit(code)



def main():
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'hs:d:qo:l:pm:k:',
                                   ['help', 'source=', 'dest=', 'quiet',
                                    'output=', 'logfile=', 'profile',
                                    'max=', 'skip='])
    except getopt.error, msg:
        usage(1, msg)

    class Options:
        source = None
        dest = None
        verbose = 1
        outfile = None
        logfile = None
        profilep = 0
        maxtxn = -1
        skiptxn = -1

    options = Options()

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage(0)
        elif opt in ('-s', '--source'):
            options.source = arg
        elif opt in ('-d', '--dest'):
            options.dest = arg
        elif opt in ('-q', '--quiet'):
            options.verbose = 0
        elif opt in ('-o', '--output'):
            options.outfile = arg
        elif opt in ('-l', '--logfile'):
            options.logfile = arg
        elif opt in ('-p', '--profile'):
            options.profilep = 1
        elif opt in ('-m', '--max'):
            options.maxtxn = int(arg)
        elif opt in ('-k', '--skip'):
            options.skiptxn = int(arg)

    if args:
        usage(1)

    if not options.source or not options.dest:
        usage(1, 'Source and destination databases must be provided')

    # Open the output file
    if options.outfile is None:
        options.outfp = sys.stdout
        options.outclosep = 0
    else:
        options.outfp = open(options.outfile, 'w')
        options.outclosep = 1

    # Open the logfile
    if options.logfile is None:
        options.logfp = sys.stdout
        options.logclosep = 0
    else:
        options.logfp = open(options.logfile, 'w')
        options.logclosep = 1

    # Print a comment, this is a hack
    print >> options.outfp, '# FS->BDB 3.3.11'
    print >> options.outfp, '#', time.ctime()

    print >>sys.stderr, 'Opening source FileStorage...'
    t0 = time.time()
    srcdb = FileStorage(options.source, read_only=1)
    t1 = time.time()
    print >>sys.stderr, 'Opening source FileStorage done. %s seconds' % (t1-t0)
#
# Uncomment this section to do a FS->BDB migration
#
    print >>sys.stderr, 'Opening destination BDB...'
    t0 = time.time()
    dstdb = BDBFullStorage(options.dest)
    t1 = time.time()
    print >>sys.stderr, 'Opening destination BDB done. %s seconds' % (t1-t0)

#
# Uncomment this section to do a FS->FS migration
#
##    print >>sys.stderr, 'Opening destination FileStorage...'
##    t0 = time.time()
##    dstdb = FileStorage(dest)
##    t1 = time.time()
##    print >>sys.stderr, 'Opening destination FileStorage done. %s seconds' % (
##        t1-t0)

    try:
        t0 = time.time()
        doit(srcdb, dstdb, options)
        t1 = time.time()
        print 'Total time:', t1-t0
    finally:
        # Done
        srcdb.close()
        dstdb.close()
        if options.outclosep:
            options.outfp.close()
        if options.logclosep:
            options.logfp.close()



def doit(srcdb, dstdb, options):
    outfp = options.outfp
    logfp = options.logfp
    profilep = options.profilep
    verbose = options.verbose
    # some global information
    largest_pickle = 0
    largest_txn_in_size = 0
    largest_txn_in_objects = 0
    # Ripped from BaseStorage.copyTransactionsFrom()
    ts = None
    ok = 1
    prevrevids = {}
    counter = 0
    skipper = 0
    for txn in srcdb.iterator():
        skipper += 1
        if skipper <= options.skiptxn:
            continue
        counter += 1
        if counter > options.maxtxn > 0:
            break
        tid = txn.tid
        if ts is None:
            ts = TimeStamp(tid)
        else:
            t = TimeStamp(tid)
            if t <= ts:
                if ok:
                    print 'Time stamps are out of order %s, %s' % (ts, t)
                    ok = 0
                    ts = t.laterThan(ts)
                    tid = `ts`
                else:
                    ts = t
                    if not ok:
                        print 'Time stamps are back in order %s' % t
                        ok = 1
        if verbose:
            print ts

        prof = None
        if profilep and (counter % 100) == 0:
            prof = profile.Profile()
        objects = 0
        size = 0
        t0 = time.time()
        dstdb.tpc_begin(txn, tid, txn.status)
        t1 = time.time()
        try:
            for r in txn:
                oid = r.oid
                objects += 1
		thissize = len(r.data)
                size += thissize
		if thissize > largest_pickle:
		    largest_pickle = thissize
                if verbose:
                    if not r.version:
                        vstr = 'norev'
                    else:
                        vstr = r.version
                    print utils.U64(oid), vstr, len(r.data)
                oldrevid = prevrevids.get(oid, ZERO)
                newrevid = dstdb.store(oid, oldrevid, r.data, r.version, txn)
                prevrevids[oid] = newrevid
            t2 = time.time()
            dstdb.tpc_vote(txn)
            t3 = time.time()
            # Profile every 100 transactions
            if prof:
                prof.runcall(dstdb.tpc_finish, txn)
            else:
                dstdb.tpc_finish(txn)
            t4 = time.time()
        except KeyError, e:
            traceback.print_exc(file=logfp)

        # record the results
	if objects > largest_txn_in_objects:
	    largest_txn_in_objects = objects
	if size > largest_txn_in_size:
	    largest_txn_in_size = size
        print >> outfp, utils.U64(tid), objects, size, t4-t0, \
              t1-t0, t2-t1, t3-t2, t4-t3

        if prof:
            prof.create_stats()
            fp = open('profile-%02d.txt' % (counter / 100), 'wb')
            marshal.dump(prof.stats, fp)
            fp.close()
    print >> outfp, largest_pickle, largest_txn_in_size, largest_txn_in_objects



if __name__ == '__main__':
    main()


=== Added File Zope3/src/zodb/storage/tests/timepickles.py ===
#! /usr/bin/env python

##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################

"""Time transaction commits and normalize vs. pickle size and #objects.

Actually just counts the size of pickles in the transaction via the iterator
protocol, so storage overheads aren't counted.

Usage: %(PROGRAM)s [options]
Options:
    -h/--help
        Print this message and exit.

    -s filename
    --source=filename
        Use database in filename as the source (must be a FileStorage)

    -d filename
    --dest=filename
        Use database in filename as the destination (must be a BDB storage)

    -o filename
    --output=filename
        Print results in filename, otherwise stdout.

    -m txncount
    --max=txncount
        Stop after committing txncount transactions.

    -k txncount
    --skip=txncount
        Skip the first txncount transactions.

    -p/--profile
        Turn on specialized profiling.

    -q/--quiet
        Be quite.
"""

import sys
import os
import getopt
import time
import errno
import profile
import traceback
import marshal

from bsddb3 import db

from zodb import utils
from zodb.timestamp import TimeStamp
from zodb.storage.file import FileStorage
from zodb.storage.bdbfull import BDBFullStorage

PROGRAM = sys.argv[0]
ZERO = '\0'*8



def usage(code, msg=''):
    print >> sys.stderr, __doc__ % globals()
    if msg:
        print >> sys.stderr, msg
    sys.exit(code)



def main():
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'hs:d:qo:l:pm:k:',
                                   ['help', 'source=', 'dest=', 'quiet',
                                    'output=', 'logfile=', 'profile',
                                    'max=', 'skip='])
    except getopt.error, msg:
        usage(1, msg)

    class Options:
        source = None
        dest = None
        verbose = 1
        outfile = None
        logfile = None
        profilep = 0
        maxtxn = -1
        skiptxn = -1

    options = Options()

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage(0)
        elif opt in ('-s', '--source'):
            options.source = arg
        elif opt in ('-d', '--dest'):
            options.dest = arg
        elif opt in ('-q', '--quiet'):
            options.verbose = 0
        elif opt in ('-o', '--output'):
            options.outfile = arg
        elif opt in ('-l', '--logfile'):
            options.logfile = arg
        elif opt in ('-p', '--profile'):
            options.profilep = 1
        elif opt in ('-m', '--max'):
            options.maxtxn = int(arg)
        elif opt in ('-k', '--skip'):
            options.skiptxn = int(arg)

    if args:
        usage(1)

    if not options.source or not options.dest:
        usage(1, 'Source and destination databases must be provided')

    # Open the output file
    if options.outfile is None:
        options.outfp = sys.stdout
        options.outclosep = 0
    else:
        options.outfp = open(options.outfile, 'w')
        options.outclosep = 1

    # Open the logfile
    if options.logfile is None:
        options.logfp = sys.stdout
        options.logclosep = 0
    else:
        options.logfp = open(options.logfile, 'w')
        options.logclosep = 1

    # Print a comment, this is a hack
    print >> options.outfp, '# FS->BDB 3.3.11'
    print >> options.outfp, '#', time.ctime()

    print >>sys.stderr, 'Opening source FileStorage...'
    t0 = time.time()
    srcdb = FileStorage(options.source, read_only=1)
    t1 = time.time()
    print >>sys.stderr, 'Opening source FileStorage done. %s seconds' % (t1-t0)
#
# Uncomment this section to do a FS->BDB migration
#
    print >>sys.stderr, 'Opening destination BDB...'
    t0 = time.time()
##    dstdb = BDBFullStorage(options.dest)
    dstdb = None
    t1 = time.time()
    print >>sys.stderr, 'Opening destination BDB done. %s seconds' % (t1-t0)

#
# Uncomment this section to do a FS->FS migration
#
##    print >>sys.stderr, 'Opening destination FileStorage...'
##    t0 = time.time()
##    dstdb = FileStorage(dest)
##    t1 = time.time()
##    print >>sys.stderr, 'Opening destination FileStorage done. %s seconds' % (
##        t1-t0)

    try:
        t0 = time.time()
        doit(srcdb, dstdb, options)
        t1 = time.time()
        print 'Total time:', t1-t0
    finally:
        # Done
        srcdb.close()
##        dstdb.close()
        if options.outclosep:
            options.outfp.close()
        if options.logclosep:
            options.logfp.close()



def doit(srcdb, dstdb, options):
    outfp = options.outfp
    logfp = options.logfp
    profilep = options.profilep
    verbose = options.verbose
    # some global information
    largest_pickle = 0
    largest_txn_in_size = 0
    largest_txn_in_objects = 0
    # Ripped from BaseStorage.copyTransactionsFrom()
    ts = None
    ok = 1
    prevrevids = {}
    counter = 0
    skipper = 0

    from bsddb3 import db
    env = db.DBEnv()
    env.open('BDB', 
        db.DB_CREATE       # create underlying files as necessary
        | db.DB_RECOVER    # run normal recovery before opening
        | db.DB_INIT_MPOOL # initialize shared memory buffer pool
        | db.DB_INIT_LOCK  # initialize locking subsystem
        | db.DB_INIT_TXN   # initialize transaction subsystem
        | db.DB_THREAD     # we use the environment from other threads
        )
    d = db.DB(env)
    d.open('zodb_picklesauce', db.DB_BTREE, db.DB_CREATE)

    for txn in srcdb.iterator():
        skipper += 1
        if skipper <= options.skiptxn:
            continue
        counter += 1
        if counter > options.maxtxn > 0:
            break
        tid = txn.tid
        if ts is None:
            ts = TimeStamp(tid)
        else:
            t = TimeStamp(tid)
            if t <= ts:
                if ok:
                    print 'Time stamps are out of order %s, %s' % (ts, t)
                    ok = 0
                    ts = t.laterThan(ts)
                    tid = `ts`
                else:
                    ts = t
                    if not ok:
                        print 'Time stamps are back in order %s' % t
                        ok = 1
        if verbose:
            print ts

        prof = None
        if profilep and (counter % 100) == 0:
            prof = profile.Profile()
        objects = 0
        size = 0
        t0 = time.time()

        t1 = time.time()
        try:
	    dbtxn = env.txn_begin()
            for r in txn:
                oid = r.oid
                objects += 1
		thissize = len(r.data)
                size += thissize
		if thissize > largest_pickle:
		    largest_pickle = thissize
                if verbose:
                    if not r.version:
                        vstr = 'norev'
                    else:
                        vstr = r.version
                    print utils.U64(oid), vstr, len(r.data)
		key = oid + tid
		d.put(key, r.data, txn=dbtxn)
            t2 = time.time()
            t3 = time.time()
	    dbtxn.commit()
            t4 = time.time()
        except KeyError, e:
            traceback.print_exc(file=logfp)

        # record the results
	if objects > largest_txn_in_objects:
	    largest_txn_in_objects = objects
	if size > largest_txn_in_size:
	    largest_txn_in_size = size
        print >> outfp, utils.U64(tid), objects, size, t4-t0, \
              t1-t0, t2-t1, t3-t2, t4-t3

        if prof:
            prof.create_stats()
            fp = open('profile-%02d.txt' % (counter / 100), 'wb')
            marshal.dump(prof.stats, fp)
            fp.close()
    d.close()
    print >> outfp, largest_pickle, largest_txn_in_size, largest_txn_in_objects



if __name__ == '__main__':
    main()


=== Added File Zope3/src/zodb/storage/tests/undo.py === (531/631 lines abridged)
"""Check transactionalUndo().

Any storage that supports transactionalUndo() must pass these tests.
"""

import time
import types

from zodb import POSException
from zodb.ztransaction import Transaction
from zodb.utils import u64, p64, z64
from zodb.db import DB

from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle

from persistence import Persistent
from transaction import get_transaction

class C(Persistent):
    pass

class TransactionalUndoStorage:

    def _transaction_begin(self):
        self.__serials = {}

    def _transaction_store(self, oid, rev, data, vers, trans):
        r = self._storage.store(oid, rev, data, vers, trans)
        if r:
            if type(r) == types.StringType:
                self.__serials[oid] = r
            else:
                for oid, serial in r:
                    self.__serials[oid] = serial

    def _transaction_vote(self, trans):
        r = self._storage.tpc_vote(trans)
        if r:
            for oid, serial in r:
                self.__serials[oid] = serial

    def _transaction_newserial(self, oid):
        return self.__serials[oid]

    def _multi_obj_transaction(self, objs):
        newrevs = {}
        t = Transaction()
        self._storage.tpc_begin(t)
        self._transaction_begin()

[-=- -=- -=- 531 lines omitted -=- -=- -=-]

                tid = info[base + j]['id']
                s.transactionalUndo(tid, t)
            s.tpc_vote(t)
            s.tpc_finish(t)
        
        for i in range(BATCHES):
            undo(i)

        # There are now (2 + OBJECTS) * BATCHES transactions:
        #     BATCHES original transactions, followed by
        #     OBJECTS * BATCHES modifications, followed by
        #     BATCHES undos

        iter = s.iterator()
        offset = 0

        eq = self.assertEqual

        for i in range(BATCHES):
            txn = iter[offset]
            offset += 1
            
            tid = p64(i + 1)
            eq(txn.tid, tid)

            L1 = [(rec.oid, rec.serial, rec.data_txn) for rec in txn]
            L2 = [(oid, revid, None) for _tid, oid, revid in orig
                  if _tid == tid]
            
            eq(L1, L2)

        for i in range(BATCHES * OBJECTS):
            txn = iter[offset]
            offset += 1
            eq(len([rec for rec in txn if rec.data_txn is None]), 1)

        for i in range(BATCHES):
            txn = iter[offset]
            offset += 1

            # The undos are performed in reverse order.
            otid = p64(BATCHES - i)
            L1 = [(rec.oid, rec.data_txn) for rec in txn]
            L2 = [(oid, otid) for _tid, oid, revid in orig
                  if _tid == otid]
            L1.sort()
            L2.sort()
            eq(L1, L2)

        self.assertRaises(IndexError, iter.__getitem__, offset)


=== Added File Zope3/src/zodb/storage/tests/undoversion.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
# Check interactions between transactionalUndo() and versions.  Any storage
# that supports both transactionalUndo() and versions must pass these tests.

from zodb import POSException
from zodb.ztransaction import Transaction
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle


class TransactionalUndoVersionStorage:
    def checkUndoInVersion(self):
        oid = self._storage.new_oid()
        version = 'one'
        revid_a = self._dostore(oid, data=MinPO(91))
        revid_b = self._dostore(oid, revid=revid_a, data=MinPO(92),
                                version=version)
        revid_c = self._dostore(oid, revid=revid_b, data=MinPO(93),
                                version=version)
        info=self._storage.undoInfo()
        tid=info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        assert len(oids) == 1
        assert oids[0] == oid
        data, revid = self._storage.load(oid, '')
        assert revid == revid_a
        assert zodb_unpickle(data) == MinPO(91)
        data, revid = self._storage.load(oid, version)
        assert revid > revid_b and revid > revid_c
        assert zodb_unpickle(data) == MinPO(92)
        # Now commit the version...
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        assert len(oids) == 1
        assert oids[0] == oid

        data, revid = self._storage.load(oid, version)
        assert zodb_unpickle(data) == MinPO(92)
        data, revid = self._storage.load(oid, '')
        assert zodb_unpickle(data) == MinPO(92)
        # ...and undo the commit
        info=self._storage.undoInfo()
        tid=info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        assert len(oids) == 1
        assert oids[0] == oid
        data, revid = self._storage.load(oid, version)
        assert zodb_unpickle(data) == MinPO(92)
        data, revid = self._storage.load(oid, '')
        assert zodb_unpickle(data) == MinPO(91)
        # Now abort the version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        assert len(oids) == 1
        assert oids[0] == oid

        data, revid = self._storage.load(oid, version)
        assert zodb_unpickle(data) == MinPO(91)
        data, revid = self._storage.load(oid, '')
        assert zodb_unpickle(data) == MinPO(91)
        # Now undo the abort
        info=self._storage.undoInfo()
        tid=info[0]['id']
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.transactionalUndo(tid, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        assert len(oids) == 1
        assert oids[0] == oid
        # And the object should be back in versions 'one' and ''
        data, revid = self._storage.load(oid, version)
        assert zodb_unpickle(data) == MinPO(92)
        data, revid = self._storage.load(oid, '')
        assert zodb_unpickle(data) == MinPO(91)


=== Added File Zope3/src/zodb/storage/tests/version.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""Run the version related tests for a storage.

Any storage that supports versions should be able to pass all these tests.
"""

from zodb import POSException
from zodb.ztransaction import Transaction
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle


class VersionStorage:
    def checkVersionedStoreAndLoad(self):
        eq = self.assertEqual
        # Store a couple of non-version revisions of the object
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12))
        # And now store some new revisions in a version
        version = 'test-version'
        revid = self._dostore(oid, revid=revid, data=MinPO(13),
                              version=version)  
        revid = self._dostore(oid, revid=revid, data=MinPO(14),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(15),
                              version=version)
        # Now read back the object in both the non-version and version and
        # make sure the values jive.
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(12))
        data, vrevid = self._storage.load(oid, version)
        eq(zodb_unpickle(data), MinPO(15))
        if hasattr(self._storage, 'getSerial'):
            s = self._storage.getSerial(oid)
            eq(s, max(revid, vrevid))

    def checkVersionedLoadErrors(self):
        oid = self._storage.new_oid()
        version = 'test-version'
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12),
                              version=version)
        # Try to load a bogus oid
        self.assertRaises(KeyError,
                          self._storage.load,
                          self._storage.new_oid(), '')
        data, revid = self._storage.load(oid, 'bogus')
        self.assertEqual(zodb_unpickle(data), MinPO(11))


    def checkVersionLock(self):
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        version = 'test-version'
        revid = self._dostore(oid, revid=revid, data=MinPO(12),
                              version=version)
        self.assertRaises(POSException.VersionLockError,
                          self._dostore,
                          oid, revid=revid, data=MinPO(14),
                          version='another-version')

    def checkVersionEmpty(self):
        # Before we store anything, these versions ought to be empty
        version = 'test-version'
        self.failUnless(self._storage.versionEmpty(version))
        # Now store some objects
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(11))
        revid = self._dostore(oid, revid=revid, data=MinPO(12))
        revid = self._dostore(oid, revid=revid, data=MinPO(13),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(14),
                              version=version)
        # The blank version should not be empty

        # Neither should 'test-version'
        self.failUnless(not self._storage.versionEmpty(version))
        # But this non-existant version should be empty
        self.failUnless(self._storage.versionEmpty('bogus'))

    def checkVersions(self):
        unless = self.failUnless
        # Store some objects in the non-version
        oid1 = self._storage.new_oid()
        oid2 = self._storage.new_oid()
        oid3 = self._storage.new_oid()
        revid1 = self._dostore(oid1, data=MinPO(11))
        revid2 = self._dostore(oid2, data=MinPO(12))
        revid3 = self._dostore(oid3, data=MinPO(13))
        # Now create some new versions
        revid1 = self._dostore(oid1, revid=revid1, data=MinPO(14),
                               version='one')
        revid2 = self._dostore(oid2, revid=revid2, data=MinPO(15),
                               version='two')
        revid3 = self._dostore(oid3, revid=revid3, data=MinPO(16),
                               version='three')
        # Ask for the versions
        versions = self._storage.versions()
        unless('one' in versions)
        unless('two' in versions)
        unless('three' in versions)
        # Now flex the `max' argument
        versions = self._storage.versions(1)
        self.assertEqual(len(versions), 1)
        unless('one' in versions or 'two' in versions or 'three' in versions)

    def _setup_version(self, version='test-version'):
        # Store some revisions in the non-version
        oid = self._storage.new_oid()
        revid = self._dostore(oid, data=MinPO(49))
        revid = self._dostore(oid, revid=revid, data=MinPO(50))
        nvrevid = revid = self._dostore(oid, revid=revid, data=MinPO(51))
        # Now do some stores in a version
        revid = self._dostore(oid, revid=revid, data=MinPO(52),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(53),
                              version=version)
        revid = self._dostore(oid, revid=revid, data=MinPO(54),
                              version=version)
        return oid, version

    def checkAbortVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        
        # XXX Not sure I can write a test for getSerial() in the
        # presence of aborted versions, because FileStorage and
        # Berkeley storage give a different answer. I think Berkeley
        # is right and FS is wrong.
        
##        s1 = self._storage.getSerial(oid)
        # Now abort the version -- must be done in a transaction
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
##        s2 = self._storage.getSerial(oid)
##        eq(s1, s2) # or self.assert(s2 > s1) ?
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(51))

    def checkAbortVersionErrors(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        # Now abort a bogus version
        t = Transaction()
        self._storage.tpc_begin(t)

        # And try to abort the empty version
        if (hasattr(self._storage, 'supportsTransactionalUndo')
            and self._storage.supportsTransactionalUndo()):
            # XXX FileStorage used to be broken on this one
            self.assertRaises(POSException.VersionError,
                              self._storage.abortVersion,
                              '', t)
        
        # But now we really try to abort the version
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(51))

    def checkCommitVersionErrors(self):
        if not (hasattr(self._storage, 'supportsTransactionalUndo')
            and self._storage.supportsTransactionalUndo()):
            # XXX FileStorage used to be broken on this one
            return
        eq = self.assertEqual
        oid1, version1 = self._setup_version('one')
        data, revid1 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(54))
        t = Transaction()
        self._storage.tpc_begin(t)
        try:
            self.assertRaises(POSException.VersionCommitError,
                              self._storage.commitVersion,
                              'one', 'one', t)
        finally:
            self._storage.tpc_abort(t)

    def checkModifyAfterAbortVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        # Now abort the version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(version, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        # Load the object's current state (which gets us the revid)
        data, revid = self._storage.load(oid, '')
        # And modify it a few times
        revid = self._dostore(oid, revid=revid, data=MinPO(52))
        revid = self._dostore(oid, revid=revid, data=MinPO(53))
        revid = self._dostore(oid, revid=revid, data=MinPO(54))
        data, newrevid = self._storage.load(oid, '')
        eq(newrevid, revid)
        eq(zodb_unpickle(data), MinPO(54))

    def checkCommitToNonVersion(self):
        eq = self.assertEqual
        oid, version = self._setup_version()
        data, revid = self._storage.load(oid, version)
        eq(zodb_unpickle(data), MinPO(54))
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(51))
        # Try committing this version to the empty version
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        data, revid = self._storage.load(oid, '')
        eq(zodb_unpickle(data), MinPO(54))

    def checkCommitToOtherVersion(self):
        eq = self.assertEqual
        oid1, version1 = self._setup_version('one')

        data, revid1 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(54))
        oid2, version2 = self._setup_version('two')
        data, revid2 = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        # make sure we see the non-version data when appropriate
        data, revid2 = self._storage.load(oid1, version2)
        eq(zodb_unpickle(data), MinPO(51))
        data, revid2 = self._storage.load(oid2, version1)
        eq(zodb_unpickle(data), MinPO(51))
        data, revid2 = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))
        
        # Okay, now let's commit object1 to version2
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version1, version2,
                                           t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid1)
        data, revid = self._storage.load(oid1, version2)
        eq(zodb_unpickle(data), MinPO(54))
        data, revid = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        # an object can only exist in one version, so a load from
        # version1 should now give the non-version data 
        data, revid2 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(51))

        # as should a version that has never been used
        data, revid2 = self._storage.load(oid1, 'bela lugosi')
        eq(zodb_unpickle(data), MinPO(51))

    def checkAbortOneVersionCommitTheOther(self):
        eq = self.assertEqual
        oid1, version1 = self._setup_version('one')
        data, revid1 = self._storage.load(oid1, version1)
        eq(zodb_unpickle(data), MinPO(54))
        oid2, version2 = self._setup_version('two')
        data, revid2 = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        # Let's make sure we can't get object1 in version2
        data, revid2 = self._storage.load(oid1, version2)
        eq(zodb_unpickle(data), MinPO(51))

        # First, let's abort version1
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.abortVersion(version1, t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid1)
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        data, revid = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(51))
        data, revid = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))
        # Okay, now let's commit version2 back to the trunk
        t = Transaction()
        self._storage.tpc_begin(t)
        oids = self._storage.commitVersion(version2, '', t)
        self._storage.tpc_vote(t)
        self._storage.tpc_finish(t)
        eq(len(oids), 1)
        eq(oids[0], oid2)
        data, revid = self._storage.load(oid1, '')
        eq(zodb_unpickle(data), MinPO(51))

        # But the trunk should be up to date now
        data, revid = self._storage.load(oid2, '')
        eq(zodb_unpickle(data), MinPO(54))
        data, revid = self._storage.load(oid2, version2)
        eq(zodb_unpickle(data), MinPO(54))

        oid = self._storage.new_oid()
        revid = self._dostore(oid, revid=revid, data=MinPO(54), version='one')
        self.assertRaises(KeyError,
                          self._storage.load, oid, '')
        self.assertRaises(KeyError,
                          self._storage.load, oid, 'two')