[Zodb-checkins] SVN: ZODB/branches/gocept-iteration/src/ZEO/ added iterator garbage collection

Thomas Lotze tl at gocept.com
Thu Feb 14 07:50:48 EST 2008


Log message for revision 83825:
  added iterator garbage collection

Changed:
  U   ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py
  U   ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py
  U   ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py
  A   ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py
  U   ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py

-=-
Modified: ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py	2008-02-14 10:31:24 UTC (rev 83824)
+++ ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py	2008-02-14 12:50:47 UTC (rev 83825)
@@ -262,6 +262,7 @@
         self._realm = realm
 
         self._iterators = weakref.WeakValueDictionary()
+        self._iterator_ids = set()
 
         # Flag tracking disconnections in the middle of a transaction.  This
         # is reset in tpc_begin() and set in notifyDisconnected().
@@ -1073,6 +1074,7 @@
             self._tbuf.clear()
             self._seriald.clear()
             del self._serials[:]
+            self._iterator_gc()
             self.end_transaction()
 
     def tpc_finish(self, txn, f=None):
@@ -1102,6 +1104,7 @@
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
         finally:
             self._load_lock.release()
+            self._iterator_gc()
             self.end_transaction()
 
     def _update_cache(self, tid):
@@ -1252,17 +1255,42 @@
         # iids are "iterator IDs" that can be used to query an iterator whose
         # status is held on the server.
         iid = self._server.iterator_start(start, stop)
-        iterator = self._iterators[iid] = self._iterator(iid, start, stop)
-        return iterator
+        return self._setup_iterator(self._iterator, iid)
 
-    def _iterator(self, iid, start, stop):
+    def _iterator(self, iid):
         while True:
             item = self._server.iterator_next(iid)
             if item is None:
+                # The iterator is exhausted, and the server has already
+                # disposed it.
+                self._forget_iterator(iid)
                 break
             yield ClientStorageTransactionInformation(self, *item)
 
+    def _setup_iterator(self, factory, iid):
+        self._iterators[iid] = iterator = factory(iid)
+        self._iterator_ids.add(iid)
+        return iterator
 
+    def _forget_iterator(self, iid):
+        self._iterators.pop(iid, None)
+        self._iterator_ids.remove(iid)
+
+    def _iterator_gc(self):
+        iids = self._iterator_ids - set(self._iterators)
+        try:
+            self._server.iterator_gc(list(iids))
+        except ClientDisconnected:
+            # We could not successfully garbage-collect iterators.
+            # The server might have been restarted, so the IIDs might mean
+            # something different now. We simply forget our unused IIDs to
+            # avoid gc'ing foreign iterators.
+            # In the case that the server was not restarted, we accept the
+            # risk of leaking resources on the ZEO server.
+            pass
+        self._iterator_ids -= iids
+
+
 class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord):
 
     def __init__(self, storage, tid, status, user, description, extension):
@@ -1276,12 +1304,14 @@
 
     def __iter__(self):
         riid = self._storage._server.iterator_record_start(self.tid)
-        iterator = self._storage._iterators[riid] = self._iterator(riid)
-        return iterator
+        return self._storage._setup_iterator(self._iterator, riid)
 
     def _iterator(self, riid):
         while True:
             item = self._storage._server.iterator_record_next(riid)
             if item is None:
+                # The iterator is exhausted, and the server has already
+                # disposed it.
+                self._storage._forget_iterator(riid)
                 break
             yield ZODB.BaseStorage.DataRecord(*item)

Modified: ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py	2008-02-14 10:31:24 UTC (rev 83824)
+++ ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py	2008-02-14 12:50:47 UTC (rev 83825)
@@ -304,7 +304,10 @@
     def iterator_record_next(self, iid):
         return self.rpc.call('iterator_record_next', iid)
 
+    def iterator_gc(self, iids):
+        return self.rpc.call('iterator_gc', iids)
 
+
 class ExtensionMethodWrapper:
     def __init__(self, rpc, name):
         self.rpc = rpc

Modified: ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py	2008-02-14 10:31:24 UTC (rev 83824)
+++ ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py	2008-02-14 12:50:47 UTC (rev 83825)
@@ -729,7 +729,11 @@
                     info.data_txn)
         return item
 
+    def iterator_gc(self, iids):
+        for iid in iids:
+            self._iterators.pop(iid, None)
 
+
 class StorageServerDB:
 
     def __init__(self, server, storage_id):

Added: ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py	                        (rev 0)
+++ ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py	2008-02-14 12:50:47 UTC (rev 83825)
@@ -0,0 +1,74 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""ZEO iterator protocol tests."""
+
+import transaction
+from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
+
+
+class IterationTests:
+
+    def checkIteratorGCProtocol(self):
+        # Test garbage collection on protocol level.
+        server = self._storage._server
+
+        iid = server.iterator_start(None, None)
+        # None signals the end of iteration.
+        self.assertEquals(None, server.iterator_next(iid))
+        # The server has disposed the iterator already.
+        self.assertRaises(KeyError, server.iterator_next, iid)
+
+        iid = server.iterator_start(None, None)
+        # This time, we tell the server to throw the iterator away.
+        server.iterator_gc([iid])
+        self.assertRaises(KeyError, server.iterator_next, iid)
+
+    def checkIteratorExhaustionStorage(self):
+        # Test the storage's garbage collection mechanism.
+        iterator = self._storage.iterator()
+        self.assertEquals(1, len(self._storage._iterator_ids))
+        iid = list(self._storage._iterator_ids)[0]
+        self.assertEquals([], list(iterator))
+        self.assertEquals(0, len(self._storage._iterator_ids))
+
+        # The iterator has run through, so the server has already disposed it.
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+    def checkIteratorGCSpanTransactions(self):
+        iterator = self._storage.iterator()
+        self._dostore()
+        self.assertEquals([], list(iterator))
+
+    def checkIteratorGCStorageCommitting(self):
+        # We want the iterator to be garbage-collected, so we don't keep any
+        # hard references to it. The storage tracks its ID, though.
+        self._storage.iterator()
+
+        self.assertEquals(1, len(self._storage._iterator_ids))
+        iid = list(self._storage._iterator_ids)[0]
+
+        # GC happens at the transaction boundary. After that, both the storage
+        # and the server have forgotten the iterator.
+        self._dostore()
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+    def checkIteratorGCStorageTPCAborting(self):
+        self._storage.iterator()
+        iid = list(self._storage._iterator_ids)[0]
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.tpc_abort(t)
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)


Property changes on: ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py
___________________________________________________________________
Name: svn:keywords
   + Id Rev Date
Name: svn:eol-style
   + native

Modified: ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py	2008-02-14 10:31:24 UTC (rev 83824)
+++ ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py	2008-02-14 12:50:47 UTC (rev 83825)
@@ -49,7 +49,8 @@
 
 import ZEO.zrpc.connection
 
-from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests, \
+     IterationTests
 
 import ZEO.tests.ConnectionTests
 
@@ -179,8 +180,9 @@
     # ZEO test mixin classes (in the same order as imported)
     CommitLockTests.CommitLockVoteTests,
     ThreadTests.ThreadTests,
+    IterationTests.IterationTests,
     # Locally defined (see above)
-    MiscZEOTests
+    MiscZEOTests,
     ):
 
     """Combine tests from various origins in one class."""



More information about the Zodb-checkins mailing list