[Zope-Checkins] CVS: ZODB3/ZEO/tests - ConnectionTests.py:1.6

Guido van Rossum guido@python.org
Wed, 30 Oct 2002 16:44:25 -0500


Update of /cvs-repository/ZODB3/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv1686

Modified Files:
	ConnectionTests.py 
Log Message:
Fixed a few docstrings that were out of date.

Added a new test: checkMultiStorageTransaction().  This tests for the
deadlocks that we've seen when multiple appservers do transactions
involving multiple ZEO 2.0 storages.  It also nicely tests the timeout
feature that Jeremy added to StorageServer.

WARNING: with the current ZEO code, this occasionally hangs.  That's
the point of this test. :-)


=== ZODB3/ZEO/tests/ConnectionTests.py 1.5 => 1.6 ===
--- ZODB3/ZEO/tests/ConnectionTests.py:1.5	Fri Oct  4 20:35:25 2002
+++ ZODB3/ZEO/tests/ConnectionTests.py	Wed Oct 30 16:44:25 2002
@@ -18,7 +18,7 @@
 import socket
 import sys
 import tempfile
-import thread
+import threading
 import time
 
 import zLOG
@@ -27,30 +27,38 @@
 from ZEO.Exceptions import Disconnected
 from ZEO.zrpc.marshal import Marshaller
 
-from ZODB.Transaction import get_transaction
+from ZODB.Transaction import get_transaction, Transaction
 from ZODB.POSException import ReadOnlyError
-from ZODB.tests import StorageTestBase
-from ZODB.tests.StorageTestBase import zodb_unpickle, MinPO
+from ZODB.tests.StorageTestBase import StorageTestBase
+from ZODB.tests.MinPO import MinPO
+from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
+from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
 
 class DummyDB:
     def invalidate(self, *args):
         pass
 
-class ConnectionTests(StorageTestBase.StorageTestBase):
+class ConnectionTests(StorageTestBase):
     """Tests that explicitly manage the server process.
 
     To test the cache or re-connection, these test cases explicit
     start and stop a ZEO storage server.
+
+    This must be subclassed; the subclass must provide implementations
+    of startServer() and shutdownServer().
     """
 
-    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+    __super_setUp = StorageTestBase.setUp
+    __super_tearDown = StorageTestBase.tearDown
 
     def setUp(self):
-        """Start a ZEO server using a Unix domain socket
+        """Test setup for connection tests.
 
-        The ZEO server uses the storage object returned by the
-        getStorage() method.
+        This starts only one server; a test may start more servers by
+        calling self._newAddr() and then self.startServer(index=i)
+        for i in 1, 2, ...
         """
+        self.__super_setUp()
         zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
         self.file = tempfile.mktemp()
         self.addr = []
@@ -70,15 +78,17 @@
         return 'localhost', random.randrange(25000, 30000, 2)
 
     def openClientStorage(self, cache='', cache_size=200000, wait=1,
-                          read_only=0, read_only_fallback=0):
-        base = ClientStorage(self.addr,
-                             client=cache,
-                             cache_size=cache_size,
-                             wait=wait,
-                             min_disconnect_poll=0.1,
-                             read_only=read_only,
-                             read_only_fallback=read_only_fallback)
-        storage = base
+                          read_only=0, read_only_fallback=0,
+                          addr=None):
+        if addr is None:
+            addr = self.addr
+        storage = ClientStorage(addr,
+                                client=cache,
+                                cache_size=cache_size,
+                                wait=wait,
+                                min_disconnect_poll=0.1,
+                                read_only=read_only,
+                                read_only_fallback=read_only_fallback)
         storage.registerDB(DummyDB(), None)
         return storage
 
@@ -376,7 +386,7 @@
                 self._dostore()
                 break
             except (Disconnected, ReadOnlyError,
-                    select.error, thread.error, socket.error):
+                    select.error, threading.ThreadError, socket.error):
                 time.sleep(0.1)
         else:
             self.fail("Couldn't store after starting a read-write server")
@@ -453,7 +463,8 @@
             try:
                 self._dostore(oid, data=obj)
                 break
-            except (Disconnected, select.error, thread.error, socket.error):
+            except (Disconnected, select.error,
+                    threading.ThreadError, socket.error):
                 zLOG.LOG("checkReconnection", zLOG.INFO,
                          "Error after server restart; retrying.",
                          error=sys.exc_info())
@@ -503,3 +514,107 @@
 
         self._storage = self.openClientStorage()
         self._dostore()
+
+    # Test case for multiple storages participating in a single
+    # transaction.  This is not really a connection test, but it needs
+    # about the same infrastructure (several storage servers).
+
+    # XXX WARNING: with the current ZEO code, this occasionally hangs.
+    # That's the point of this test. :-)
+
+    def checkMultiStorageTransaction(self):
+        # Configuration parameters (larger values mean more likely deadlocks)
+        self.nservers = 2
+        self.nthreads = 2
+        self.ntrans = 2
+        self.nobj = 2
+
+        # Start extra servers
+        for i in range(1, self.nservers):
+            self._newAddr()
+            self.startServer(index=i)
+
+        # Spawn threads that each do some transactions on all storages
+        threads = []
+        try:
+            for i in range(self.nthreads):
+                t = MSTThread(self, "T%d" % i)
+                threads.append(t)
+                t.start()
+            # Wait for all threads to finish
+            for t in threads:
+                t.join(60)
+                self.failIf(t.isAlive(), "%s didn't die" % t.getName())
+        finally:
+            for t in threads:
+                t.closeclients()
+
+class MSTThread(threading.Thread):
+
+    __super_init = threading.Thread.__init__
+
+    def __init__(self, testcase, name):
+        self.__super_init(name=name)
+        self.testcase = testcase
+        self.clients = []
+
+    def run(self):
+        tname = self.getName()
+        testcase = self.testcase
+
+        # Create client connections to each server
+        clients = self.clients
+        for i in range(len(testcase.addr)):
+            c = testcase.openClientStorage(addr=testcase.addr[i])
+            c.__name = "C%d" % i
+            clients.append(c)
+
+        for i in range(testcase.ntrans):
+            # Because we want a transaction spanning all storages,
+            # we can't use _dostore().  This is several _dostore() calls
+            # expanded in-line (mostly).
+
+            # Create oid->serial mappings
+            for c in clients:
+                c.__oids = []
+                c.__serials = {}
+
+            # Begin a transaction
+            t = Transaction()
+            for c in clients:
+                c.tpc_begin(t)
+
+            for j in range(testcase.nobj):
+                for c in clients:
+                    # Create and store a new object on each server
+                    oid = c.new_oid()
+                    c.__oids.append(oid)
+                    data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
+                    data = zodb_pickle(data)
+                    s = c.store(oid, ZERO, data, '', t)
+                    c.__serials.update(handle_all_serials(oid, s))
+
+            # Vote on all servers and handle serials
+            for c in clients:
+                s = c.tpc_vote(t)
+                c.__serials.update(handle_all_serials(None, s))
+
+            # Finish on all servers
+            for c in clients:
+                c.tpc_finish(t)
+
+            for c in clients:
+                # Check that we got serials for all oids
+                for oid in c.__oids:
+                    testcase.failUnless(c.__serials.has_key(oid))
+                # Check that we got serials for no other oids
+                for oid in c.__serials.keys():
+                    testcase.failUnless(oid in c.__oids)
+
+    def closeclients(self):
+        # Close clients opened by run()
+        for c in self.clients:
+            try:
+                c.close()
+            except:
+                pass