[Zope-Checkins] CVS: Zope/lib/python/ZEO/tests - deadlock.py:1.1.6.1 CommitLockTests.py:1.9.8.2 ConnectionTests.py:1.5.2.2

Chris McDonough chrism@zope.com
Sun, 24 Nov 2002 18:55:57 -0500


Update of /cvs-repository/Zope/lib/python/ZEO/tests
In directory cvs.zope.org:/tmp/cvs-serv13982/tests

Modified Files:
      Tag: chrism-install-branch
	CommitLockTests.py ConnectionTests.py 
Added Files:
      Tag: chrism-install-branch
	deadlock.py 
Log Message:
Merge with HEAD.


=== Added File Zope/lib/python/ZEO/tests/deadlock.py ===
import ZODB
from ZODB.POSException import ConflictError
from ZEO.ClientStorage import ClientStorage, ClientDisconnected
from ZEO.zrpc.error import DisconnectedError

import os
import random
import time

L = range(1, 100)

def main():
    z1 = ClientStorage(('localhost', 2001), wait=1)
    z2 = ClientStorage(('localhost', 2002), wait=2)
    db1 = ZODB.DB(z1)
    db2 = ZODB.DB(z2)
    c1 = db1.open()
    c2 = db2.open()
    r1 = c1.root()
    r2 = c2.root()

    while 1:
        try:
            try:
                update(r1, r2)
            except ConflictError, msg:
                print msg
                get_transaction().abort()
                c1.sync()
                c2.sync()
        except (ClientDisconnected, DisconnectedError), err:
            print "disconnected", err
            time.sleep(2)

def update(r1, r2):
    k1 = random.choice(L)
    k2 = random.choice(L)

    updates = [(k1, r1),
               (k2, r2)]
    random.shuffle(updates)
    for key, root in updates:
        root[key] = time.time()
    get_transaction().commit()
    print os.getpid(), k1, k2

if __name__ == "__main__":
    main()


=== Zope/lib/python/ZEO/tests/CommitLockTests.py 1.9.8.1 => 1.9.8.2 ===
--- Zope/lib/python/ZEO/tests/CommitLockTests.py:1.9.8.1	Tue Oct  8 20:41:42 2002
+++ Zope/lib/python/ZEO/tests/CommitLockTests.py	Sun Nov 24 18:55:26 2002
@@ -124,9 +124,12 @@
         # started, but before it finishes.  The dowork() function
         # executes after the first transaction has completed.
 
-        # Start on transaction normally.
+        # Start on transaction normally and get the lock.
         t = Transaction()
         self._storage.tpc_begin(t)
+        oid = self._storage.new_oid()
+        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
+        self._storage.tpc_vote(t)
 
         # Start a second transaction on a different connection without
         # blocking the test thread.
@@ -141,9 +144,6 @@
             else:
                 self._storages.append((storage2, t2))
 
-        oid = self._storage.new_oid()
-        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
-        self._storage.tpc_vote(t)
         if method_name == "tpc_finish":
             self._storage.tpc_finish(t)
             self._storage.load(oid, '')


=== Zope/lib/python/ZEO/tests/ConnectionTests.py 1.5.2.1 => 1.5.2.2 ===
--- Zope/lib/python/ZEO/tests/ConnectionTests.py:1.5.2.1	Tue Oct  8 20:41:42 2002
+++ Zope/lib/python/ZEO/tests/ConnectionTests.py	Sun Nov 24 18:55:26 2002
@@ -18,7 +18,7 @@
 import socket
 import sys
 import tempfile
-import thread
+import threading
 import time
 
 import zLOG
@@ -27,30 +27,38 @@
 from ZEO.Exceptions import Disconnected
 from ZEO.zrpc.marshal import Marshaller
 
-from ZODB.Transaction import get_transaction
+from ZODB.Transaction import get_transaction, Transaction
 from ZODB.POSException import ReadOnlyError
-from ZODB.tests import StorageTestBase
-from ZODB.tests.StorageTestBase import zodb_unpickle, MinPO
+from ZODB.tests.StorageTestBase import StorageTestBase
+from ZODB.tests.MinPO import MinPO
+from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
+from ZODB.tests.StorageTestBase import handle_all_serials, ZERO
 
 class DummyDB:
     def invalidate(self, *args):
         pass
 
-class ConnectionTests(StorageTestBase.StorageTestBase):
+class ConnectionTests(StorageTestBase):
     """Tests that explicitly manage the server process.
 
     To test the cache or re-connection, these test cases explicit
     start and stop a ZEO storage server.
+
+    This must be subclassed; the subclass must provide implementations
+    of startServer() and shutdownServer().
     """
 
-    __super_tearDown = StorageTestBase.StorageTestBase.tearDown
+    __super_setUp = StorageTestBase.setUp
+    __super_tearDown = StorageTestBase.tearDown
 
     def setUp(self):
-        """Start a ZEO server using a Unix domain socket
+        """Test setup for connection tests.
 
-        The ZEO server uses the storage object returned by the
-        getStorage() method.
+        This starts only one server; a test may start more servers by
+        calling self._newAddr() and then self.startServer(index=i)
+        for i in 1, 2, ...
         """
+        self.__super_setUp()
         zLOG.LOG("testZEO", zLOG.INFO, "setUp() %s" % self.id())
         self.file = tempfile.mktemp()
         self.addr = []
@@ -70,15 +78,17 @@
         return 'localhost', random.randrange(25000, 30000, 2)
 
     def openClientStorage(self, cache='', cache_size=200000, wait=1,
-                          read_only=0, read_only_fallback=0):
-        base = ClientStorage(self.addr,
-                             client=cache,
-                             cache_size=cache_size,
-                             wait=wait,
-                             min_disconnect_poll=0.1,
-                             read_only=read_only,
-                             read_only_fallback=read_only_fallback)
-        storage = base
+                          read_only=0, read_only_fallback=0,
+                          addr=None):
+        if addr is None:
+            addr = self.addr
+        storage = ClientStorage(addr,
+                                client=cache,
+                                cache_size=cache_size,
+                                wait=wait,
+                                min_disconnect_poll=0.1,
+                                read_only=read_only,
+                                read_only_fallback=read_only_fallback)
         storage.registerDB(DummyDB(), None)
         return storage
 
@@ -376,7 +386,7 @@
                 self._dostore()
                 break
             except (Disconnected, ReadOnlyError,
-                    select.error, thread.error, socket.error):
+                    select.error, threading.ThreadError, socket.error):
                 time.sleep(0.1)
         else:
             self.fail("Couldn't store after starting a read-write server")
@@ -453,7 +463,8 @@
             try:
                 self._dostore(oid, data=obj)
                 break
-            except (Disconnected, select.error, thread.error, socket.error):
+            except (Disconnected, select.error,
+                    threading.ThreadError, socket.error):
                 zLOG.LOG("checkReconnection", zLOG.INFO,
                          "Error after server restart; retrying.",
                          error=sys.exc_info())
@@ -503,3 +514,113 @@
 
         self._storage = self.openClientStorage()
         self._dostore()
+
+    # Test case for multiple storages participating in a single
+    # transaction.  This is not really a connection test, but it needs
+    # about the same infrastructure (several storage servers).
+
+    # XXX WARNING: with the current ZEO code, this occasionally fails.
+    # That's the point of this test. :-)
+
+    def NOcheckMultiStorageTransaction(self):
+        # Configuration parameters (larger values mean more likely deadlocks)
+        N = 2
+        # These don't *have* to be all the same, but it's convenient this way
+        self.nservers = N
+        self.nthreads = N
+        self.ntrans = N
+        self.nobj = N
+
+        # Start extra servers
+        for i in range(1, self.nservers):
+            self._newAddr()
+            self.startServer(index=i)
+
+        # Spawn threads that each do some transactions on all storages
+        threads = []
+        try:
+            for i in range(self.nthreads):
+                t = MSTThread(self, "T%d" % i)
+                threads.append(t)
+                t.start()
+            # Wait for all threads to finish
+            for t in threads:
+                t.join(60)
+                self.failIf(t.isAlive(), "%s didn't die" % t.getName())
+        finally:
+            for t in threads:
+                t.closeclients()
+
+class MSTThread(threading.Thread):
+
+    __super_init = threading.Thread.__init__
+
+    def __init__(self, testcase, name):
+        self.__super_init(name=name)
+        self.testcase = testcase
+        self.clients = []
+
+    def run(self):
+        tname = self.getName()
+        testcase = self.testcase
+
+        # Create client connections to each server
+        clients = self.clients
+        for i in range(len(testcase.addr)):
+            c = testcase.openClientStorage(addr=testcase.addr[i])
+            c.__name = "C%d" % i
+            clients.append(c)
+
+        for i in range(testcase.ntrans):
+            # Because we want a transaction spanning all storages,
+            # we can't use _dostore().  This is several _dostore() calls
+            # expanded in-line (mostly).
+
+            # Create oid->serial mappings
+            for c in clients:
+                c.__oids = []
+                c.__serials = {}
+
+            # Begin a transaction
+            t = Transaction()
+            for c in clients:
+                #print "%s.%s.%s begin\n" % (tname, c.__name, i),
+                c.tpc_begin(t)
+
+            for j in range(testcase.nobj):
+                for c in clients:
+                    # Create and store a new object on each server
+                    oid = c.new_oid()
+                    c.__oids.append(oid)
+                    data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
+                    #print data.value
+                    data = zodb_pickle(data)
+                    s = c.store(oid, ZERO, data, '', t)
+                    c.__serials.update(handle_all_serials(oid, s))
+
+            # Vote on all servers and handle serials
+            for c in clients:
+                #print "%s.%s.%s vote\n" % (tname, c.__name, i),
+                s = c.tpc_vote(t)
+                c.__serials.update(handle_all_serials(None, s))
+
+            # Finish on all servers
+            for c in clients:
+                #print "%s.%s.%s finish\n" % (tname, c.__name, i),
+                c.tpc_finish(t)
+
+            for c in clients:
+                # Check that we got serials for all oids
+                for oid in c.__oids:
+                    testcase.failUnless(c.__serials.has_key(oid))
+                # Check that we got serials for no other oids
+                for oid in c.__serials.keys():
+                    testcase.failUnless(oid in c.__oids)
+
+    def closeclients(self):
+        # Close clients opened by run()
+        for c in self.clients:
+            try:
+                c.close()
+            except:
+                pass