[Zodb-checkins] SVN: ZODB/trunk/src/ Fixed a threading bug in the StorageServerDB implementation that cause

Jim Fulton jim at zope.com
Tue Mar 23 17:12:20 EDT 2010


Log message for revision 110118:
  Fixed a threading bug in the StorageServerDB implementation that cause
  the fan-out to fail.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/zeo-fan-out.test

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2010-03-23 12:51:06 UTC (rev 110117)
+++ ZODB/trunk/src/CHANGES.txt	2010-03-23 21:12:19 UTC (rev 110118)
@@ -5,6 +5,12 @@
 3.10.0a2 (2010-??-??)
 =====================
 
+Bugs Fixed
+----------
+
+- When using using a ClientStorage in a Storage server, there was a
+  threading bug that caused clients to get disconnected.
+
 New Features
 ------------
 

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2010-03-23 12:51:06 UTC (rev 110117)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2010-03-23 21:12:19 UTC (rev 110118)
@@ -786,13 +786,6 @@
             raise StorageServerError("Versions aren't supported.")
         storage_id = self.storage_id
         self.server.invalidate(None, storage_id, tid, oids)
-        for zeo_server in self.server.connections.get(storage_id, ())[:]:
-            try:
-                zeo_server.connection.poll()
-            except ZEO.zrpc.error.DisconnectedError:
-                pass
-            else:
-                break # We only need to pull one :)
 
     def invalidateCache(self):
         self.server._invalidateCache(self.storage_id)

Modified: ZODB/trunk/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeo-fan-out.test	2010-03-23 12:51:06 UTC (rev 110117)
+++ ZODB/trunk/src/ZEO/tests/zeo-fan-out.test	2010-03-23 21:12:19 UTC (rev 110118)
@@ -10,25 +10,29 @@
 We'll start the first server:
 
     >>> (_, port0), adminaddr0 = start_server(
-    ...     '<filestorage>\npath fs\n</filestorage>', keep=1)
+    ...     '<filestorage>\npath fs\nblob-dir blobs\n</filestorage>', keep=1)
 
 Then we'll start 2 others that use this one:
 
-    >>> addr1, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
-    >>> addr2, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
+    >>> addr1, _ = start_server(
+    ...   '<zeoclient>\nserver %s\nblob-dir b1\n</zeoclient>' % port0)
+    >>> addr2, _ = start_server(
+    ...   '<zeoclient>\nserver %s\nblob-dir b2\n</zeoclient>' % port0)
 
+
 Now, let's create some client storages that connect to these:
 
-    >>> import ZEO, transaction
+    >>> import os, ZEO, ZODB.blob, ZODB.POSException, transaction
 
-    >>> db1 = ZEO.DB(addr1)
+    >>> db0 = ZEO.DB(port0, blob_dir='cb0')
+    >>> db1 = ZEO.DB(addr1, blob_dir='cb1')
     >>> tm1 = transaction.TransactionManager()
     >>> c1 = db1.open(transaction_manager=tm1)
     >>> r1 = c1.root()
     >>> r1
     {}
 
-    >>> db2 = ZEO.DB(addr2)
+    >>> db2 = ZEO.DB(addr2, blob_dir='cb2')
     >>> tm2 = transaction.TransactionManager()
     >>> c2 = db2.open(transaction_manager=tm2)
     >>> r2 = c2.root()
@@ -43,7 +47,12 @@
     >>> r1[1].v = 1000
     >>> r1[2] = persistent.mapping.PersistentMapping()
     >>> r1[2].v = -1000
+    >>> r1[3] = ZODB.blob.Blob('x'*4111222)
+    >>> for i in range(1000, 2000):
+    ...     r1[i] = persistent.mapping.PersistentMapping()
+    ...     r1[i].v = 0
     >>> tm1.commit()
+    >>> blob_id = r1[3]._p_oid, r1[1]._p_serial
 
     >>> import time
     >>> for i in range(100):
@@ -51,7 +60,9 @@
     ...     if 1 in r2:
     ...         break
     ...     time.sleep(0.01)
-    
+    >>> tm2.abort()
+
+
     >>> r2[1].v
     1000
 
@@ -61,33 +72,60 @@
 Now, let's see if we can break it. :)
 
     >>> def f():
+    ...     c = db1.open(transaction.TransactionManager())
+    ...     r = c.root()
+    ...     i = 0
+    ...     while i < 100:
+    ...         r[1].v -= 1
+    ...         r[2].v += 1
+    ...         try:
+    ...             c.transaction_manager.commit()
+    ...             i += 1
+    ...         except ZODB.POSException.ConflictError:
+    ...             c.transaction_manager.abort()
+    ...     c.close()
+
+    >>> def g():
+    ...     c = db0.open(transaction.TransactionManager())
+    ...     r = c.root()
     ...     for i in range(100):
-    ...         r1[1].v -= 1
-    ...         r1[2].v += 1
-    ...         tm1.commit()
-    ...         time.sleep(0.01)
+    ...         for j in range(1000, 2000):
+    ...             r[j].v += 1
+    ...         c.transaction_manager.commit()
+    ...     c.close()
+
     >>> import threading
-    >>> thread = threading.Thread(target=f)
-    >>> thread.start()
+    >>> threadf = threading.Thread(target=f)
+    >>> threadg = threading.Thread(target=f)
+    >>> threadf.start()
 
-    >>> for i in range(1000):
+    >>> threadg.start()
+
+    >>> s2 = db2.storage
+    >>> start_time = time.time()
+    >>> while time.time() - start_time < 999:
     ...     t = tm2.begin()
     ...     if r2[1].v + r2[2].v:
     ...         print 'oops', r2[1], r2[2]
-    ...     if r1[1].v == 900:
+    ...     if r2[1].v == 900:
     ...         break # we caught up
-    ...     time.sleep(0.01)
+    ...     path = s2.fshelper.getBlobFilename(*blob_id)
+    ...     if os.path.exists(path):
+    ...         ZODB.blob.remove_committed(path)
+    ...     s2._server.sendBlob(*blob_id)
+    ... else: print 'Dang'
 
-    >>> thread.join()
-    
-    
+    >>> threadf.join()
+
+    >>> threadg.join()
+
 If we shutdown and restart the source server, the variables will be
 invalidated:
 
     >>> stop_server(adminaddr0)
     >>> _ = start_server('<filestorage 1>\npath fs\n</filestorage>\n',
     ...                  port=port0)
-    
+
     >>> for i in range(1000):
     ...     c1.sync()
     ...     c2.sync()
@@ -109,5 +147,6 @@
 
 Cleanup:
 
+    >>> db0.close()
     >>> db1.close()
     >>> db2.close()



More information about the Zodb-checkins mailing list