[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Updated to support the registerDB framework which allows storages to

Jim Fulton jim at zope.com
Fri May 11 16:04:15 EDT 2007


Log message for revision 75688:
  Updated to support the registerDB framework which allows storages to
  generate it's own invalidations.  Also updated to honor the storage
  APIs more carefully.  These changes together allow a ClientStorage to
  be served by a storage server.
  

Changed:
  U   ZODB/trunk/src/ZEO/StorageServer.py
  A   ZODB/trunk/src/ZEO/tests/registerDB.test
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  A   ZODB/trunk/src/ZEO/tests/zeo-fan-out.test

-=-
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2007-05-11 20:04:07 UTC (rev 75687)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2007-05-11 20:04:14 UTC (rev 75688)
@@ -31,6 +31,9 @@
 
 import transaction
 
+import ZODB.serialize
+import ZEO.zrpc.error
+
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
@@ -625,24 +628,37 @@
                 self.log(msg, logging.ERROR)
                 err = StorageServerError(msg)
             # The exception is reported back as newserial for this oid
-            newserial = err
+            newserial = [(oid, err)]
         else:
             if serial != "\0\0\0\0\0\0\0\0":
                 self.invalidated.append((oid, version))
-        if newserial == ResolvedSerial:
-            self.stats.conflicts_resolved += 1
-            self.log("conflict resolved oid=%s" % oid_repr(oid), BLATHER)
-        self.serials.append((oid, newserial))
+
+            if isinstance(newserial, str):
+                newserial = [(oid, newserial)]
+
+        if newserial:
+            for oid, s in newserial:
+
+                if s == ResolvedSerial:
+                    self.stats.conflicts_resolved += 1
+                    self.log("conflict resolved oid=%s"
+                             % oid_repr(oid), BLATHER)
+
+                self.serials.append((oid, s))
+
         return err is None
 
     def _vote(self):
+        if not self.store_failed:
+            # Only call tpc_vote of no store call failed, otherwise
+            # the serialnos() call will deliver an exception that will be
+            # handled by the client in its tpc_vote() method.
+            serials = self.storage.tpc_vote(self.transaction)
+            if serials:
+                self.serials.extend(serials)
+
         self.client.serialnos(self.serials)
-        # If a store call failed, then return to the client immediately.
-        # The serialnos() call will deliver an exception that will be
-        # handled by the client in its tpc_vote() method.
-        if self.store_failed:
-            return
-        return self.storage.tpc_vote(self.transaction)
+        return
 
     def _abortVersion(self, src):
         tid, oids = self.storage.abortVersion(src, self.transaction)
@@ -741,7 +757,31 @@
         else:
             return 1
 
+class StorageServerDB:
 
+    def __init__(self, server, storage_id):
+        self.server = server
+        self.storage_id = storage_id
+        self.references = ZODB.serialize.referencesf
+
+    def invalidate(self, tid, oids, version=''):
+        storage_id = self.storage_id
+        self.server.invalidate(
+            None, storage_id, tid,
+            [(oid, version) for oid in oids],
+            )
+        for zeo_server in self.server.connections.get(storage_id, ())[:]:
+            try:
+                zeo_server.connection.poll()
+            except ZEO.zrpc.error.DisconnectedError:
+                pass
+            else:
+                break # We only need to pull one :)
+
+    def invalidateCache(self):
+        self.server._invalidateCache(self.storage_id)
+        
+
 class StorageServer:
 
     """The server side implementation of ZEO.
@@ -845,17 +885,12 @@
         # The list is kept in sorted order with the most recent
         # invalidation at the front.  The list never has more than
         # self.invq_bound elements.
+        self.invq_bound = invalidation_queue_size
         self.invq = {}
         for name, storage in storages.items():
-            lastInvalidations = getattr(storage, 'lastInvalidations', None)
-            if lastInvalidations is None:
-                self.invq[name] = [(storage.lastTransaction(), None)]
-            else:
-                self.invq[name] = list(
-                    lastInvalidations(invalidation_queue_size)
-                    )
-                self.invq[name].reverse()
-        self.invq_bound = invalidation_queue_size
+            self._setup_invq(name, storage)
+            storage.registerDB(StorageServerDB(self, name))
+
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection)
@@ -875,6 +910,17 @@
         else:
             self.monitor = None
 
+    def _setup_invq(self, name, storage):
+        lastInvalidations = getattr(storage, 'lastInvalidations', None)
+        if lastInvalidations is None:
+            self.invq[name] = [(storage.lastTransaction(), None)]
+        else:
+            self.invq[name] = list(
+                lastInvalidations(self.invq_bound)
+                )
+            self.invq[name].reverse()
+
+
     def _setup_auth(self, protocol):
         # Can't be done in global scope, because of cyclic references
         from ZEO.auth import get_module
@@ -947,6 +993,49 @@
         stats.clients += 1
         return self.timeouts[storage_id], stats
 
+    def _invalidateCache(self, storage_id):
+        """We need to invalidate any caches we have.
+
+        This basically means telling our clients to
+        invalidate/revalidate their caches. We do this by closing them
+        and making them reconnect.
+        """
+
+        # This method can be called from foreign threads.  We have to
+        # worry about interaction with the main thread.
+
+        # 1. We modify self.invq which is read by get_invalidations
+        #    below. This is why get_invalidations makes a copy of
+        #    self.invq.
+
+        # 2. We access connections.  There are two dangers:
+        #
+        # a. We miss a new connection.  This is not a problem because
+        #    if a client connects after we get the list of connections,
+        #    then it will have to read the invalidation queue, which
+        #    has already been reset.
+        #
+        # b. A connection is closes while we are iterating.  This
+        #    doesn't matter, bacause we can call should_close on a closed
+        #    connection.
+
+        # Rebuild invq
+        self._setup_invq(storage_id, self.storages[storage_id])
+
+        connections = self.connections.get(storage_id, ())
+
+        # Make a copy since we are going to be mutating the
+        # connections indirectoy by closing them.  We don't care about
+        # later transactions since they will have to validate their
+        # caches anyway.
+        connections = connections[:]        
+        for p in connections:
+            try:
+                p.connection.should_close()
+            except ZEO.zrpc.error.DisconnectedError:
+                pass
+        
+
     def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
 
@@ -972,6 +1061,27 @@
 
         """
 
+        # This method can be called from foreign threads.  We have to
+        # worry about interaction with the main thread.
+
+        # 1. We modify self.invq which is read by get_invalidations
+        #    below. This is why get_invalidations makes a copy of
+        #    self.invq.
+
+        # 2. We access connections.  There are two dangers:
+        #
+        # a. We miss a new connection.  This is not a problem because
+        #    we are called while the storage lock is held.  A new
+        #    connection that tries to read data won't read committed
+        #    data without first recieving an invalidation.  Also, if a
+        #    client connects after getting the list of connections,
+        #    then it will have to read the invalidation queue, which
+        #    has been updated to reflect the invalidations.
+        #
+        # b. A connection is closes while we are iterating. We'll need
+        #    to cactch and ignore Disconnected errors.
+        
+
         if invalidated:
             invq = self.invq[storage_id]
             if len(invq) >= self.invq_bound:
@@ -980,7 +1090,11 @@
 
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
-                p.client.invalidateTransaction(tid, invalidated)
+                try:
+                    p.client.invalidateTransaction(tid, invalidated)
+                except ZEO.zrpc.error.DisconnectedError:
+                    pass
+
             elif info is not None:
                 p.client.info(info)
 
@@ -994,7 +1108,13 @@
         do full cache verification.
         """
 
+        
         invq = self.invq[storage_id]
+
+        # We make a copy of invq because it might be modified by a
+        # foreign (other than main thread) calling invalidate above.        
+        invq = invq[:]
+
         if not invq:
             log("invq empty")
             return None, []

Copied: ZODB/trunk/src/ZEO/tests/registerDB.test (from rev 75684, ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test)

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2007-05-11 20:04:07 UTC (rev 75687)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2007-05-11 20:04:14 UTC (rev 75688)
@@ -26,6 +26,9 @@
 import unittest
 import shutil
 
+import zope.testing.setupstack
+from zope.testing import doctest
+
 # ZODB test support
 import ZODB
 import ZODB.tests.util
@@ -150,11 +153,13 @@
         self._servers = [adminaddr]
         self._conf_path = path
         if not self.blob_cache_dir:
-            self.blob_cache_dir = tempfile.mkdtemp()  # This is the blob cache for ClientStorage
-        self._storage = ClientStorage(zport, '1', cache_size=20000000,
-                                      min_disconnect_poll=0.5, wait=1,
-                                      wait_timeout=60, blob_dir=self.blob_cache_dir,
-                                      blob_cache_writable=self.blob_cache_writable)
+            # This is the blob cache for ClientStorage
+            self.blob_cache_dir = tempfile.mkdtemp()
+        self._storage = ClientStorage(
+            zport, '1', cache_size=20000000,
+            min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60, blob_dir=self.blob_cache_dir,
+            blob_cache_writable=self.blob_cache_writable)
         self._storage.registerDB(DummyDB())
 
     def tearDown(self):
@@ -816,10 +821,20 @@
                 BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 
 
+def zeoFanOutSetup(test):
+    zope.testing.setupstack.setUpDirectory(test)
+
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
                                        tearDown=ZODB.tests.util.tearDown))
+    suite.addTest(doctest.DocFileSuite('registerDB.test'))
+    suite.addTest(
+        doctest.DocFileSuite('zeo-fan-out.test',
+                             setUp=zeoFanOutSetup,
+                             tearDown=zope.testing.setupstack.tearDown,
+                             ),
+        )
     for klass in test_classes:
         sub = unittest.makeSuite(klass, "check")
         suite.addTest(sub)

Copied: ZODB/trunk/src/ZEO/tests/zeo-fan-out.test (from rev 75684, ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test)



More information about the Zodb-checkins mailing list