[Zodb-checkins] SVN: ZODB/trunk/src/ ZEO Servers now provide an option, invalidation-age, that allows

Jim Fulton jim at zope.com
Fri Jan 2 16:22:40 EST 2009


Log message for revision 94461:
  ZEO Servers now provide an option, invalidation-age, that allows
  quick verification of ZEO clients less than a given age even if the
  number of transactions the client hasn't seen exceeds the
  invalidation queue size. This is only recommended if the storage
  being served  supports effecient iteration from a point near the end
  of the transaction history.
  
  Also refactored ZEO/tests/zeoserver and ZEO/runzeo to have the test
  server use, and this exercisem, the option-handling code from runzeo.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/component.xml
  U   ZODB/trunk/src/ZEO/runzeo.py
  U   ZODB/trunk/src/ZEO/tests/forker.py
  A   ZODB/trunk/src/ZEO/tests/invalidation-age.txt
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/tests/zeoserver.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/CHANGES.txt	2009-01-02 21:22:39 UTC (rev 94461)
@@ -2,7 +2,7 @@
  Change History
 ================
 
-3.9.0 (2008-??-??)
+3.9.0 (2009-??-??)
 ====================
 
 New Features
@@ -22,7 +22,7 @@
   XXX There are known issues with this implementation that need to be
   sorted out before it is "released".
 
-3.9.0a9 (2008-12-??)
+3.9.0a9 (2009-01-??)
 ====================
 
 New Features
@@ -42,10 +42,18 @@
 - As a small convenience (mainly for tests), you can now specify
   initial data as a string argument to the Blob constructor.
 
-- The FileStorage iterator now handles large files better.  Whenm
+- ZEO Servers now provide an option, invalidation-age, that allows
+  quick verification of ZEO clients less than a given age even if the
+  number of transactions the client hasn't seen exceeds the
+  invalidation queue size. This is only recommended if the storage
+  being served  supports effecient iteration from a point near the end
+  of the transaction history.
+
+- The FileStorage iterator now handles large files better.  When
   iteratng from a starting transaction near the end of the file, the
   iterator will scan backward from the end of the file to find the
-  starting point.
+  starting point.  This enhancement makes it practical to take
+  advantage of the new storage server invalidation-age option.
 
 3.9.0a8 (2008-12-15)
 ====================

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2009-01-02 21:22:39 UTC (rev 94461)
@@ -34,6 +34,7 @@
 import transaction
 
 import ZODB.serialize
+import ZODB.TimeStamp
 import ZEO.zrpc.error
 
 import zope.interface
@@ -48,7 +49,7 @@
 from ZODB.POSException import StorageError, StorageTransactionError
 from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
 from ZODB.serialize import referencesf
-from ZODB.utils import u64, oid_repr, mktemp
+from ZODB.utils import u64, p64, oid_repr, mktemp
 from ZODB.loglevels import BLATHER
 
 
@@ -817,6 +818,7 @@
 
     def __init__(self, addr, storages, read_only=0,
                  invalidation_queue_size=100,
+                 invalidation_age=None,
                  transaction_timeout=None,
                  monitor_address=None,
                  auth_protocol=None,
@@ -854,6 +856,13 @@
             speed client cache verification when a client disconnects
             for a short period of time.
 
+        invalidation_age --
+            If the invalidation queue isn't big enough to support a
+            quick verification, but the last transaction seen by a
+            client is younger than the invalidation age, then
+            invalidations will be computed by iterating over
+            transactions later than the given transaction.
+
         transaction_timeout -- The maximum amount of time to wait for
             a transaction to commit after acquiring the storage lock.
             If the transaction takes too long, the client connection
@@ -907,7 +916,7 @@
         for name, storage in storages.items():
             self._setup_invq(name, storage)
             storage.registerDB(StorageServerDB(self, name))
-
+        self.invalidation_age = invalidation_age
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection)
@@ -1126,31 +1135,35 @@
         do full cache verification.
         """
 
-
-        invq = self.invq[storage_id]
-
         # We make a copy of invq because it might be modified by a
         # foreign (other than main thread) calling invalidate above.
-        invq = invq[:]
+        invq = self.invq[storage_id][:]
 
-        if not invq:
+        oids = set()
+        latest_tid = None
+        if invq and invq[-1][0] <= tid:
+            # We have needed data in the queue
+            for _tid, L in invq:
+                if _tid <= tid:
+                    break
+                oids.update(L)
+            latest_tid = invq[0][0]
+        elif (self.invalidation_age and
+              (self.invalidation_age >
+               (time.time()-ZODB.TimeStamp.TimeStamp(tid).timeTime())
+               )
+              ):
+            for t in self.storages[storage_id].iterator(p64(u64(tid)+1)):
+                for r in t:
+                    oids.add(r.oid)
+                latest_tid = t.tid
+        elif not invq:
             log("invq empty")
-            return None, []
+        else:
+            log("tid to old for invq %s < %s" % (u64(tid), u64(invq[-1][0])))
 
-        earliest_tid = invq[-1][0]
-        if earliest_tid > tid:
-            log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
-            return None, []
+        return latest_tid, list(oids)
 
-        oids = {}
-        for _tid, L in invq:
-            if _tid <= tid:
-                break
-            for key in L:
-                oids[key] = 1
-        latest_tid = invq[0][0]
-        return latest_tid, oids.keys()
-
     def close_server(self):
         """Close the dispatcher so that there are no new connections.
 

Modified: ZODB/trunk/src/ZEO/component.xml
===================================================================
--- ZODB/trunk/src/ZEO/component.xml	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/component.xml	2009-01-02 21:22:39 UTC (rev 94461)
@@ -45,6 +45,16 @@
       </description>
     </key>
 
+    <key name="invalidation-age" datatype="float" required="no">
+      <description>
+        The maximum age of a client for which quick-verification
+        invalidations will be provided by iterating over the served
+        storage. This option should only be used if the served storage
+        supports efficient iteration from a starting point near the
+        end of the transaction history (e.g. end of file).
+      </description>
+    </key>
+
     <key name="monitor-address" datatype="socket-binding-address"
          required="no">
       <description>

Modified: ZODB/trunk/src/ZEO/runzeo.py
===================================================================
--- ZODB/trunk/src/ZEO/runzeo.py	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/runzeo.py	2009-01-02 21:22:39 UTC (rev 94461)
@@ -100,6 +100,7 @@
         self.add("read_only", "zeo.read_only", default=0)
         self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
                  default=100)
+        self.add("invalidation_age", "zeo.invalidation_age")
         self.add("transaction_timeout", "zeo.transaction_timeout",
                  "t:", "timeout=", float)
         self.add("monitor_address", "zeo.monitor_address.address",
@@ -137,8 +138,8 @@
                 if s.name is None:
                     s.name = '1'
                     break
-                
 
+
 class ZEOServer:
 
     def __init__(self, options):
@@ -243,17 +244,7 @@
             SignalHandler.registerHandler(SIGUSR2, self.handle_sigusr2)
 
     def create_server(self):
-        from ZEO.StorageServer import StorageServer
-        self.server = StorageServer(
-            self.options.address,
-            self.storages,
-            read_only=self.options.read_only,
-            invalidation_queue_size=self.options.invalidation_queue_size,
-            transaction_timeout=self.options.transaction_timeout,
-            monitor_address=self.options.monitor_address,
-            auth_protocol=self.options.auth_protocol,
-            auth_database=self.options.auth_database,
-            auth_realm=self.options.auth_realm)
+        self.server = create_server(self.storages, self.options)
 
     def loop_forever(self):
         asyncore.loop()
@@ -332,6 +323,23 @@
             except IOError:
                 logger.error("PID file '%s' could not be removed" % pidfile)
 
+
+def create_server(storages, options):
+    from ZEO.StorageServer import StorageServer
+    return StorageServer(
+        options.address,
+        storages,
+        read_only = options.read_only,
+        invalidation_queue_size = options.invalidation_queue_size,
+        invalidation_age = options.invalidation_age,
+        transaction_timeout = options.transaction_timeout,
+        monitor_address = options.monitor_address,
+        auth_protocol = options.auth_protocol,
+        auth_database = options.auth_database,
+        auth_realm = options.auth_realm,
+        )
+
+
 # Signal names
 
 signames = None

Modified: ZODB/trunk/src/ZEO/tests/forker.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/forker.py	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/tests/forker.py	2009-01-02 21:22:39 UTC (rev 94461)
@@ -36,6 +36,7 @@
         self.address = addr
         self.read_only = None
         self.invalidation_queue_size = None
+        self.invalidation_age = None
         self.monitor_address = None
         self.transaction_timeout = None
         self.authentication_protocol = None
@@ -49,6 +50,8 @@
             print >> f, "read-only", self.read_only and "true" or "false"
         if self.invalidation_queue_size is not None:
             print >> f, "invalidation-queue-size", self.invalidation_queue_size
+        if self.invalidation_age is not None:
+            print >> f, "invalidation-age", self.invalidation_age
         if self.monitor_address is not None:
             print >> f, "monitor-address %s:%s" % self.monitor_address
         if self.transaction_timeout is not None:

Added: ZODB/trunk/src/ZEO/tests/invalidation-age.txt
===================================================================
--- ZODB/trunk/src/ZEO/tests/invalidation-age.txt	                        (rev 0)
+++ ZODB/trunk/src/ZEO/tests/invalidation-age.txt	2009-01-02 21:22:39 UTC (rev 94461)
@@ -0,0 +1,141 @@
+Invalidation age
+================
+
+When a ZEO client with a non-empty cache connects to the server, it
+needs to verify whether the data in its cache is current.  It does
+this in one of 2 ways:
+
+quick verification
+   It gets a list of invalidations from the server since the last
+   transaction the client has seen and applies those to it's disk and
+   in-memory caches.  This is only possible if there haven't been too
+   many transactions since the client was last connected.
+
+full verification
+   If quick verification isn't possible, the client iterates through
+   it's disk cache asking the server to verify whether each current
+   entry is valid.
+
+Unfortunately, for large caches, full verification is soooooo not
+quick that it is impractical.  Quick verificatioin is highly
+desireable.
+
+To support quick verification, the server keeps a list of recent
+invalidations. The size of this list is controlled by the
+invalidation_queue_size parameter.  If there is a lot of database
+activity, the size might need to be quite large to support having
+clients be disconnected for more than a few minutes.  A very large
+invalidation queue size can use a lot of memory.
+
+To suppliment the invalidation queue, you can also specify an
+invalidation_age parameter.  When a client connects and presents the
+last transaction id it has seen, we first check to see if the
+invalidation queue has that transaction id. It it does, then we send
+all transactions since that id.  Otherwise, we check to see if the
+difference between storage's last transaction id and the given id is
+less than or equal to the invalidation age.  If it is, then we iterate
+over the storage, starting with the given id, to get the invalidations
+since the given id.
+
+NOTE: This assumes that iterating from a point near the "end" of a
+database is inexpensive. Don't use this option for a storage for which
+that is not the case.
+
+Here's an example.  We set up a server, using an
+invalidation-queue-size of 5:
+
+    >>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5),
+    ...                            keep=True)
+
+Now, we'll open a client with a persistent cache, set up some data,
+and  then close client:
+
+    >>> import ZEO, transaction
+    >>> db = ZEO.DB(addr, client='test')
+    >>> conn = db.open()
+    >>> for i in range(9):
+    ...     conn.root()[i] = conn.root().__class__()
+    ...     conn.root()[i].x = 0
+    >>> transaction.commit()
+    >>> db.close()
+
+We'll open another client, and commit some transactions:
+
+    >>> db = ZEO.DB(addr)
+    >>> conn = db.open()
+    >>> import transaction
+    >>> for i in range(2):
+    ...     conn.root()[i].x = 1
+    ...     transaction.commit()
+    >>> db.close()
+
+If we reopen the first client, we'll do quick verification.  We'll
+turn on logging so we can see this:
+
+    >>> import logging, sys
+    >>> logging.getLogger().setLevel(logging.INFO)
+    >>> handler = logging.StreamHandler(sys.stdout)
+    >>> logging.getLogger().addHandler(handler)
+
+    >>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
+    ('localhost', ...
+    ('localhost', ...) Recovering 2 invalidations
+
+    >>> logging.getLogger().removeHandler(handler)
+
+    >>> [v.x for v in db.open().root().values()]
+    [1, 1, 0, 0, 0, 0, 0, 0, 0]
+
+Now, if we disconnect and commit more than 5 transactions, we'll see
+that verification is necessary:
+
+    >>> db.close()
+    >>> db = ZEO.DB(addr)
+    >>> conn = db.open()
+    >>> import transaction
+    >>> for i in range(9):
+    ...     conn.root()[i].x = 2
+    ...     transaction.commit()
+    >>> db.close()
+
+    >>> logging.getLogger().addHandler(handler)
+    >>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
+    ('localhost', ...
+    ('localhost', ...) Verifying cache
+    ('localhost', ...) endVerify finishing
+    ('localhost', ...) endVerify finished
+
+    >>> logging.getLogger().removeHandler(handler)
+
+    >>> [v.x for v in db.open().root().values()]
+    [2, 2, 2, 2, 2, 2, 2, 2, 2]
+
+    >>> db.close()
+
+But if we restart the server with invalidation-age set, we can
+do quick verification:
+
+    >>> stop_server(admin)
+    >>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5,
+    ...                                          invalidation_age=100))
+    >>> db = ZEO.DB(addr)
+    >>> conn = db.open()
+    >>> import transaction
+    >>> for i in range(9):
+    ...     conn.root()[i].x = 3
+    ...     transaction.commit()
+    >>> db.close()
+
+
+    >>> logging.getLogger().addHandler(handler)
+    >>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
+    ('localhost', ...
+    ('localhost', ...) Recovering 9 invalidations
+
+    >>> logging.getLogger().removeHandler(handler)
+
+    >>> [v.x for v in db.open().root().values()]
+    [3, 3, 3, 3, 3, 3, 3, 3, 3]
+
+    >>> db.close()
+


Property changes on: ZODB/trunk/src/ZEO/tests/invalidation-age.txt
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2009-01-02 21:22:39 UTC (rev 94461)
@@ -68,7 +68,7 @@
     """ZEO tests that don't fit in elsewhere."""
 
     def checkCreativeGetState(self):
-        # This test covers persistent objects that provide their own 
+        # This test covers persistent objects that provide their own
         # __getstate__ which modifies the state of the object.
         # For details see bug #98275
 
@@ -456,7 +456,7 @@
             )
 
         ZEO.zrpc.connection.client_map[None] = Evil()
-        
+
         try:
             ZEO.zrpc.connection.client_trigger.pull_trigger()
         except DisconnectedError:
@@ -498,7 +498,7 @@
                 self._invalidatedCache += 1
             def invalidate(*a, **k):
                 pass
-                
+
         db = DummyDB()
         storage.registerDB(db)
 
@@ -517,8 +517,8 @@
 
         # Now, the root object in the connection should have been invalidated:
         self.assertEqual(db._invalidatedCache, base+1)
-    
 
+
 class CommonBlobTests:
 
     def getConfig(self):
@@ -638,7 +638,7 @@
         for i in range(1000000):
             somedata.write("%s\n" % i)
         somedata.seek(0)
-        
+
         blob = Blob()
         bd_fh = blob.open('w')
         ZODB.utils.cp(somedata, bd_fh)
@@ -665,7 +665,7 @@
         def check_data(path):
             self.assert_(os.path.exists(path))
             f = open(path, 'rb')
-            somedata.seek(0) 
+            somedata.seek(0)
             d1 = d2 = 1
             while d1 or d2:
                 d1 = f.read(8096)
@@ -681,7 +681,7 @@
             self.blobdir,
             ZODB.blob.BushyLayout().getBlobFilePath(oid, revid),
             )
-        
+
         self.assert_(server_filename.startswith(self.blobdir))
         check_data(server_filename)
 
@@ -800,12 +800,12 @@
     >>> for i in range(20):
     ...     o2.x = PersistentDict(); o2 = o2.x
     ...     commit()
-    
+
     >>> trans, oids = s1.getInvalidations(last)
     >>> from ZODB.utils import u64
     >>> sorted([int(u64(oid)) for oid in oids])
     [10, 11, 12, 13, 14]
-    
+
     >>> server.close_server()
     """
 
@@ -834,7 +834,7 @@
     >>> db.close()
 
 Now we'll open a storage server on the data, simulating a restart:
-    
+
     >>> fs = FileStorage('t.fs')
     >>> sv = StorageServer(('', get_port()), dict(fs=fs))
     >>> s = ZEOStorage(sv, sv.read_only)
@@ -884,7 +884,7 @@
     >>> sv = StorageServer(('', get_port()), dict(fs=fs))
     >>> st = StorageServerWrapper(sv, 'fs')
     >>> s = st.server
-    
+
 Now, if we ask for the invalidations since the last committed
 transaction, we'll get a result:
 
@@ -1070,7 +1070,7 @@
 
     >>> handler.uninstall()
     >>> stop_server(admin)
-    
+
     """
 
 def history_over_zeo():
@@ -1106,7 +1106,7 @@
     """If we delete on one client, the delete should be reflected on the other.
 
     First, we'll create an object:
-    
+
     >>> addr, _ = start_server()
     >>> db = ZEO.DB(addr)
     >>> conn = db.open()
@@ -1116,10 +1116,10 @@
 
     We verify that we can read it in another client, which also loads
     it into the client cache.
-    
+
     >>> cs = ClientStorage(addr)
     >>> p, s = cs.load(oid)
-    
+
     Now, we'll remove the object:
 
     >>> txn = transaction.begin()
@@ -1137,7 +1137,7 @@
 
     We'll wait for our other storage to get the invalidation and then
     try to access the object. We'll get a POSKeyError there too:
-    
+
     >>> tid = db.storage.lastTransaction()
     >>> forker.wait_until(lambda : cs.lastTransaction() == tid)
     >>> cs.load(oid) # doctest: +ELLIPSIS
@@ -1153,7 +1153,7 @@
     BlobAdaptedFileStorageTests, BlobWritableCacheTests,
     DemoStorageTests, FileStorageTests, MappingStorageTests,
     ]
-    
+
 quick_test_classes = [
     FileStorageRecoveryTests, ConfigurationTests, HeartbeatTests,
     CatastrophicClientLoopFailure, ConnectionInvalidationOnReconnect,
@@ -1194,7 +1194,7 @@
                                    shared_blob_dir=True)
         else:
             ClientStorage.__init__(self, addr, blob_dir=blob_dir)
-            
+
     def close(self):
         ClientStorage.close(self)
         zope.testing.setupstack.tearDown(self)
@@ -1228,7 +1228,7 @@
         doctest.DocFileSuite(
             'zeo-fan-out.test', 'zdoptions.test',
             'drop_cache_rather_than_verify.txt', 'client-config.test',
-            'protocols.test', 'zeo_blob_cache.test',
+            'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )

Modified: ZODB/trunk/src/ZEO/tests/zeoserver.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeoserver.py	2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/tests/zeoserver.py	2009-01-02 21:22:39 UTC (rev 94461)
@@ -13,20 +13,18 @@
 ##############################################################################
 """Helper file used to launch a ZEO server cross platform"""
 
-from ZEO.StorageServer import StorageServer
-from ZEO.runzeo import ZEOOptions
-
 import asyncore
-import os
-import sys
-import time
 import errno
 import getopt
-import socket
+import logging
+import os
 import signal
-import asyncore
+import socket
+import sys
 import threading
-import logging
+import time
+import ZEO.runzeo
+import ZEO.zrpc.connection
 
 def cleanup(storage):
     # FileStorage and the Berkeley storages have this method, which deletes
@@ -164,15 +162,14 @@
         elif opt == '-S':
             suicide = False
         elif opt == '-v':
-            import ZEO.zrpc.connection
             ZEO.zrpc.connection.Connection.current_protocol = arg
 
-    zo = ZEOOptions()
+    zo = ZEO.runzeo.ZEOOptions()
     zo.realize(["-C", configfile])
     zeo_port = int(zo.address[1])
 
     if zo.auth_protocol == "plaintext":
-        import ZEO.tests.auth_plaintext
+        __import__('ZEO.tests.auth_plaintext')
 
     # Open the config file and let ZConfig parse the data there.  Then remove
     # the config file, otherwise we'll leave turds.
@@ -185,16 +182,7 @@
     mon_addr = None
     if zo.monitor_address:
         mon_addr = zo.monitor_address
-    server = StorageServer(
-        zo.address,
-        {"1": storage},
-        read_only=zo.read_only,
-        invalidation_queue_size=zo.invalidation_queue_size,
-        transaction_timeout=zo.transaction_timeout,
-        monitor_address=mon_addr,
-        auth_protocol=zo.auth_protocol,
-        auth_database=zo.auth_database,
-        auth_realm=zo.auth_realm)
+    server = ZEO.runzeo.create_server({"1": storage}, zo)
 
     try:
         log(label, 'creating the test server, keep: %s', keep)



More information about the Zodb-checkins mailing list