[Zodb-checkins] CVS: StandaloneZODB/ZEO/tests - CommitLockTests.py:1.1.2.3

Jeremy Hylton jeremy@zope.com
Tue, 7 May 2002 00:43:15 -0400


Update of /cvs-repository/StandaloneZODB/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv17792/tests

Modified Files:
      Tag: ZEO2-branch
	CommitLockTests.py 
Log Message:
Add a multi-threaded test of the commit lock that actually queues
clients.



=== StandaloneZODB/ZEO/tests/CommitLockTests.py 1.1.2.2 => 1.1.2.3 ===
 
+import threading
+
 from ZODB.Transaction import Transaction
 from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
 
 import ZEO.ClientStorage
+from ZEO.Exceptions import Disconnected
 
 ZERO = '\0'*8
 
@@ -11,19 +14,68 @@
     def invalidate(self, *args):
         pass
 
+class WorkerThread(threading.Thread):
+
+    # run the entire test in a thread so that the blocking call for
+    # tpc_vote() doesn't hang the test suite.
+
+    def __init__(self, storage, trans, method="tpc_finish"):
+        self.storage = storage
+        self.trans = trans
+        self.method = method
+        threading.Thread.__init__(self)
+
+    def run(self):
+        try:
+            self.storage.tpc_begin(self.trans)
+        except Disconnected:
+            return
+        oid = self.storage.new_oid()
+        self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
+        oid = self.storage.new_oid()
+        self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
+        self.storage.tpc_vote(self.trans)
+        if self.method == "tpc_finish":
+            self.storage.tpc_finish(self.trans)
+        else:
+            self.storage.tpc_abort(self.trans)
+
 class CommitLockTests:
+
+    # The commit lock tests verify that the storage successfully
+    # blocks and restarts transactions when there is content for a
+    # single storage.  There are a lot of cases to cover.
+
+    # CommitLock1 checks the case where a single transaction delays
+    # other transactions before they actually block.  IOW, by the time
+    # the other transactions get to the vote stage, the first
+    # transaction has finished.
     
-    def checkCommitLockOnCommit(self):
+    def checkCommitLock1OnCommit(self):
+        self._storages = []
+        try:
+            self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
+        finally:
+            self._cleanup()
+
+    def checkCommitLock1OnAbort(self):
         self._storages = []
         try:
-            self._checkCommitLock("tpc_finish")
+            self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
         finally:
             self._cleanup()
 
-    def checkCommitLockOnAbort(self):
+    def checkCommitLock2OnCommit(self):
         self._storages = []
         try:
-            self._checkCommitLock("tpc_abort")
+            self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
+        finally:
+            self._cleanup()
+
+    def checkCommitLock2OnAbort(self):
+        self._storages = []
+        try:
+            self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
         finally:
             self._cleanup()
 
@@ -33,7 +85,7 @@
             store.close()
         self._storages = []
 
-    def _checkCommitLock(self, method_name):
+    def _checkCommitLock(self, method_name, dosetup, dowork):
         # check the commit lock when a client attemps a transaction,
         # but fails/exits before finishing the commit.
 
@@ -44,12 +96,11 @@
         # Start a second transaction on a different connection without
         # blocking the test thread.
         self._storages = []
-        for i in range(3):
+        for i in range(4):
             storage2 = self._duplicate_client()
             t2 = Transaction()
-            # ???
-            tid = `ZEO.ClientStorage.get_timestamp()`
-            storage2.tpc_begin(t2, tid)
+            tid = `ZEO.ClientStorage.get_timestamp()` # XXX why?
+            dosetup(storage2, t2, tid)
             if i == 0:
                 storage2.close()
             else:
@@ -64,17 +115,33 @@
         else:
             self._storage.tpc_abort(t)
 
-        self._dowork()
+        dowork(method_name)
 
         # Make sure the server is still responsive
         self._dostore()
 
-    def _dowork(self):
+    def _dosetup1(self, storage, trans, tid):
+        storage.tpc_begin(trans, tid)
+
+    def _dowork1(self, method_name):
         for store, trans in self._storages:
             oid = store.new_oid()
             store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
             store.tpc_vote(trans)
-            store.tpc_abort(trans)
+            if method_name == "tpc_finish":
+                store.tpc_finish(trans)
+            else:
+                store.tpc_abort(trans)
+
+    def _dosetup2(self, storage, trans, tid):
+        self._threads = []
+        t = WorkerThread(storage, trans)
+        self._threads.append(t)
+        t.start()
+
+    def _dowork2(self, method_name):
+        for t in self._threads:
+            t.join()
 
     def _duplicate_client(self):
         "Open another ClientStorage to the same server."