[Zope-Checkins] CVS: ZODB3/ZEO/tests - InvalidationTests.py:1.1.4.1 ConnectionTests.py:1.4.2.5

Jeremy Hylton jeremy@zope.com
Wed, 11 Jun 2003 19:12:02 -0400


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv6722/ZEO/tests

Modified Files:
      Tag: ZODB3-3_1-branch
	ConnectionTests.py 
Added Files:
      Tag: ZODB3-3_1-branch
	InvalidationTests.py 
Log Message:
Merge tim-loading_oids_status-branch to ZODB 3.1 release branch.

Includes four critical bug fixes.


=== Added File ZODB3/ZEO/tests/InvalidationTests.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

import threading
import time

from BTrees.check import check, display
from BTrees.OOBTree import OOBTree

from ZEO.tests.TestThread import TestThread

from ZODB.DB import DB
from ZODB.POSException \
     import ReadConflictError, ConflictError, VersionLockError

class StressThread(TestThread):

    def __init__(self, testcase, db, stop, threadnum, startnum,
                 step=2, sleep=None):
        TestThread.__init__(self, testcase)
        self.db = db
        self.stop = stop
        self.threadnum = threadnum
        self.startnum = startnum
        self.step = step
        self.sleep = sleep
        self.added_keys = []

    def testrun(self):
        cn = self.db.open()
        while not self.stop.isSet():
            try:
                tree = cn.root()["tree"]
                break
            except (ConflictError, KeyError):
                get_transaction().abort()
                cn.sync()
        key = self.startnum
        while not self.stop.isSet():
            try:
                tree[key] = self.threadnum
                get_transaction().commit()
                if self.sleep:
                    time.sleep(self.sleep)
            except (ReadConflictError, ConflictError), msg:
                get_transaction().abort()
                # sync() is necessary here to process invalidations
                # if we get a read conflict.  In the read conflict case,
                # no objects were modified so cn never got registered
                # with the transaction.
                cn.sync()
            else:
                self.added_keys.append(key)
                key += self.step
        cn.close()

class VersionStressThread(TestThread):
    
    def __init__(self, testcase, db, stop, threadnum, startnum,
                 step=2, sleep=None):
        TestThread.__init__(self, testcase)
        self.db = db
        self.stop = stop
        self.threadnum = threadnum
        self.startnum = startnum
        self.step = step
        self.sleep = sleep
        self.added_keys = []

    def testrun(self):
        commit = 0
        key = self.startnum
        while not self.stop.isSet():
            version = "%s:%s" % (self.threadnum, key)
            commit = not commit
            if self.oneupdate(version, key, commit):
                self.added_keys.append(key)
            key += self.step

    def oneupdate(self, version, key, commit=1):
        # The mess of sleeps below were added to reduce the number
        # of VersionLockErrors, based on empirical observation.
        # It looks like the threads don't switch enough without
        # the sleeps.
        
        cn = self.db.open(version)
        while not self.stop.isSet():
            try:
                tree = cn.root()["tree"]
                break
            except (ConflictError, KeyError):
                get_transaction().abort()
                cn.sync()
        while not self.stop.isSet():
            try:
                tree[key] = self.threadnum
                get_transaction().commit()
                if self.sleep:
                    time.sleep(self.sleep)
                break
            except (VersionLockError, ReadConflictError, ConflictError), msg:
                get_transaction().abort()
                # sync() is necessary here to process invalidations
                # if we get a read conflict.  In the read conflict case,
                # no objects were modified so cn never got registered
                # with the transaction.
                cn.sync()
                if self.sleep:
                    time.sleep(self.sleep)
        try:
            while not self.stop.isSet():
                try:
                    if commit:
                        self.db.commitVersion(version)
                        get_transaction().note("commit version %s" % version)
                    else:
                        self.db.abortVersion(version)
                        get_transaction().note("abort version %s" % version)
                    get_transaction().commit()
                    if self.sleep:
                        time.sleep(self.sleep)
                    return commit
                except ConflictError, msg:
                    get_transaction().abort()
                    cn.sync()
        finally:
            cn.close()
        return 0

class InvalidationTests:

    level = 2
    DELAY = 15

    def _check_tree(self, cn, tree):
        # Make sure the BTree is sane and that all the updates persisted
        retries = 3
        while retries:
            retries -= 1
            try:
                check(tree)
                tree._check()
            except ReadConflictError:
                if retries:
                    get_transaction().abort()
                    cn.sync()
                else:
                    raise
            except:
                display(tree)
                raise

    def _check_threads(self, tree, *threads):
        # Make sure the thread's view of the world is consistent with
        # the actual database state.
        for t in threads:
            # If the test didn't add any keys, it didn't do what we expected.
            self.assert_(t.added_keys)
            for key in t.added_keys:
                self.assert_(tree.has_key(key))

    def go(self, stop, *threads):
        # Run the threads
        for t in threads:
            t.start()
        time.sleep(self.DELAY)
        stop.set()
        for t in threads:
            t.cleanup()
    
    def checkConcurrentUpdates2Storages(self):
        self._storage = storage1 = self.openClientStorage()
        storage2 = self.openClientStorage()
        db1 = DB(storage1)
        db2 = DB(storage2)
        stop = threading.Event()

        cn = db1.open()
        tree = cn.root()["tree"] = OOBTree()
        get_transaction().commit()

        # Run two threads that update the BTree
        t1 = StressThread(self, db1, stop, 1, 1)
        t2 = StressThread(self, db2, stop, 2, 2)
        self.go(stop, t1, t2)

        cn.sync()
        self._check_tree(cn, tree)
        self._check_threads(tree, t1, t2)

        cn.close()
        db1.close()
        db2.close()

    def checkConcurrentUpdates1Storage(self):
        self._storage = storage1 = self.openClientStorage()
        db1 = DB(storage1)
        stop = threading.Event()

        cn = db1.open()
        tree = cn.root()["tree"] = OOBTree()
        get_transaction().commit()

        # Run two threads that update the BTree
        t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
        t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
        self.go(stop, t1, t2)

        cn.sync()
        self._check_tree(cn, tree)
        self._check_threads(tree, t1, t2)

        cn.close()
        db1.close()

    def checkConcurrentUpdates2StoragesMT(self):
        self._storage = storage1 = self.openClientStorage()
        db1 = DB(storage1)
        db2 = DB(self.openClientStorage())
        stop = threading.Event()

        cn = db1.open()
        tree = cn.root()["tree"] = OOBTree()
        get_transaction().commit()

        # Run three threads that update the BTree.
        # Two of the threads share a single storage so that it
        # is possible for both threads to read the same object
        # at the same time.
        
        t1 = StressThread(self, db1, stop, 1, 1, 3)
        t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
        t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
        self.go(stop, t1, t2, t3)

        cn.sync()
        self._check_tree(cn, tree)
        self._check_threads(tree, t1, t2, t3)

        cn.close()
        db1.close()
        db2.close()

    def checkConcurrentUpdatesInVersions(self):
        self._storage = storage1 = self.openClientStorage()
        db1 = DB(storage1)
        db2 = DB(self.openClientStorage())
        stop = threading.Event()

        cn = db1.open()
        tree = cn.root()["tree"] = OOBTree()
        get_transaction().commit()

        # Run three threads that update the BTree.
        # Two of the threads share a single storage so that it
        # is possible for both threads to read the same object
        # at the same time.
        
        t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
        t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
        t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
        self.go(stop, t1, t2, t3)

        cn.sync()
        self._check_tree(cn, tree)
        self._check_threads(tree, t1, t2, t3)

        cn.close()
        db1.close()
        db2.close()



=== ZODB3/ZEO/tests/ConnectionTests.py 1.4.2.4 => 1.4.2.5 ===
--- ZODB3/ZEO/tests/ConnectionTests.py:1.4.2.4	Tue Apr 29 17:39:56 2003
+++ ZODB3/ZEO/tests/ConnectionTests.py	Wed Jun 11 19:11:32 2003
@@ -26,6 +26,7 @@
 from ZEO.ClientStorage import ClientStorage, ClientDisconnected
 from ZEO.Exceptions import Disconnected
 from ZEO.zrpc.marshal import Marshaller
+from ZEO.tests.InvalidationTests import InvalidationTests
 
 from ZODB.Transaction import get_transaction, Transaction
 from ZODB.POSException import ReadOnlyError
@@ -37,7 +38,8 @@
     def invalidate(self, *args):
         pass
 
-class ConnectionTests(StorageTestBase.StorageTestBase):
+class ConnectionTests(StorageTestBase.StorageTestBase,
+                      InvalidationTests):
     """Tests that explicitly manage the server process.
 
     To test the cache or re-connection, these test cases explicit
@@ -53,7 +55,7 @@
         getStorage() method.
         """
         zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
-        self.file = tempfile.mktemp()
+        self.file = tempfile.mktemp(".fs")
         self.addr = []
         self._pids = []
         self._servers = []
@@ -574,3 +576,4 @@
         txn = Transaction()
         storage.tpc_begin(txn)
         storage.tpc_abort(txn)
+