[Zodb-checkins] CVS: StandaloneZODB/ZEO/tests - CommitLockTests.py:1.1.2.1 testZEO.py:1.16.4.4.2.5

Jeremy Hylton jeremy@zope.com
Mon, 6 May 2002 17:08:22 -0400


Update of /cvs-repository/StandaloneZODB/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv20463/tests

Modified Files:
      Tag: ZEO2-branch
	testZEO.py 
Added Files:
      Tag: ZEO2-branch
	CommitLockTests.py 
Log Message:
Factor out the commit lock tests into a separate module.



=== Added File StandaloneZODB/ZEO/tests/CommitLockTests.py ===
"""Tests of the distributed commit lock."""

from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO

import ZEO.ClientStorage

ZERO = '\0'*8

class DummyDB:
    def invalidate(self, *args):
        pass

class CommitLockTests:
    
    def checkCommitLockOnCommit(self):
        self._storages = []
        try:
            self._checkCommitLock("tpc_finish")
        finally:
            self._cleanup()

    def checkCommitLockOnAbort(self):
        self._storages = []
        try:
            self._checkCommitLock("tpc_abort")
        finally:
            self._cleanup()

    def _cleanup(self):
        for store, trans in self._storages:
            store.tpc_abort(trans)
            store.close()
        self._storages = []

    def _checkCommitLock(self, method_name):
        # check the commit lock when a client attemps a transaction,
        # but fails/exits before finishing the commit.

        # Start on transaction normally.
        t = Transaction()
        self._storage.tpc_begin(t)

        # Start a second transaction on a different connection without
        # blocking the test thread.
        self._storages = []
        for i in range(3):
            storage2 = self._duplicate_client()
            t2 = Transaction()
            # ???
            tid = `ZEO.ClientStorage.get_timestamp()`
            storage2.tpc_begin(t2, tid)
            if i == 0:
                storage2.close()
            else:
                print "storage", i, "opened"
                self._storages.append((storage2, t2))

        print "finishing original commit"
        oid = self._storage.new_oid()
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
        self._storage.tpc_vote(t)
        if method_name == "tpc_finish":
            self._storage.tpc_finish(t)
            self._storage.load(oid, '')
        else:
            self._storage.tpc_abort(t)
        print "done"

        self._dowork()

        # Make sure the server is still responsive
        self._dostore()

    def _dowork(self):
        for store, trans in self._storages:
            oid = store.new_oid()
            store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
            store.tpc_vote(trans)
            store.tpc_abort(trans)

    def _duplicate_client(self):
        "Open another ClientStorage to the same server."
        # XXX argh it's hard to find the actual address
        # The rpc mgr addr attribute is a list.  Each element in the
        # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
        # address.
        addr = self._storage._rpc_mgr.addr[0][1]
        new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
        new.registerDB(DummyDB(), None)
        return new

    def _get_timestamp(self):
        t = time.time()
        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
        return `t`



=== StandaloneZODB/ZEO/tests/testZEO.py 1.16.4.4.2.4 => 1.16.4.4.2.5 ===
 import sys
 import tempfile
+import thread
 import time
 import types
 import unittest
@@ -28,10 +29,10 @@
 import ThreadedAsync, ZEO.trigger
 from ZODB.FileStorage import FileStorage
 from ZODB.Transaction import Transaction
-import thread
+from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
 import zLOG
 
-from ZEO.tests import forker, Cache
+from ZEO.tests import forker, Cache, CommitLockTests
 from ZEO.smac import Disconnected
 
 # Sorry Jim...
@@ -42,8 +43,6 @@
 from ZODB.tests.MinPO import MinPO
 from ZODB.tests.StorageTestBase import zodb_unpickle
 
-ZERO = '\0'*8
-
 class DummyDB:
     def invalidate(self, *args):
         pass
@@ -72,6 +71,7 @@
                    Synchronization.SynchronizedStorage,
                    MTStorage.MTStorage,
                    ReadOnlyStorage.ReadOnlyStorage,
+                   CommitLockTests.CommitLockTests,
                    ):
     """An abstract base class for ZEO tests
 
@@ -102,75 +102,6 @@
     def checkLargeUpdate(self):
         obj = MinPO("X" * (10 * 128 * 1024))
         self._dostore(data=obj)
-
-    def checkCommitLockOnCommit(self):
-        try:
-            self._checkCommitLock("tpc_finish")
-        finally:
-            self._cleanup()
-
-    def checkCommitLockOnAbort(self):
-        try:
-            self._checkCommitLock("tpc_abort")
-        finally:
-            self._cleanup()
-
-    def _cleanup(self):
-        for store, trans in self._storages:
-            store.tpc_abort(trans)
-            store.close()
-        self._storages = []
-
-    def _checkCommitLock(self, method_name):
-        # check the commit lock when a client attemps a transaction,
-        # but fails/exits before finishing the commit.
-
-        # Start on transaction normally.
-        t = Transaction()
-        self._storage.tpc_begin(t)
-
-        # Start a second transaction on a different connection without
-        # blocking the test thread.
-        self._storages = []
-        for i in range(3):
-            storage2 = self._duplicate_client()
-            t2 = Transaction()
-            tid = `ZEO.ClientStorage.get_timestamp()`
-            try:
-                storage2._server.tpc_begin(tid, t2.user, t2.description,
-                                           t2._extension, None, ' ')
-            except Disconnected:
-                self.fail("client %d disconnected!" % i)
-            if i == 0:
-                storage2.close()
-            else:
-                self._storages.append((storage2, t2))
-
-        oid = self._storage.new_oid()
-        self._storage.store(oid, None, '', '', t)
-        self._storage.tpc_vote(t)
-        self._storage.tpc_finish(t)
-
-        self._cleanup()
-
-        # Make sure the server is still responsive
-        self._dostore()
-
-    def _duplicate_client(self):
-        "Open another ClientStorage to the same server."
-        # XXX argh it's hard to find the actual address
-        # The rpc mgr addr attribute is a list.  Each element in the
-        # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
-        # address.
-        addr = self._storage._rpc_mgr.addr[0][1]
-        new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
-        new.registerDB(DummyDB(), None)
-        return new
-
-    def _get_timestamp(self):
-        t = time.time()
-        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
-        return `t`
 
 class ZEOFileStorageTests(GenericTests):
     __super_setUp = GenericTests.setUp