[Zodb-checkins] SVN: ZODB/trunk/src/Z Cleaned up the Z309 ZEO protocol, removing versions from arguments and

Jim Fulton jim at zope.com
Tue Nov 25 18:54:19 EST 2008


Log message for revision 93355:
  Cleaned up the Z309 ZEO protocol, removing versions from arguments and
  return values.  This in turn simplified the client and server
  software.
  
  Added code to select different client and server stubs and input
  handlers depening on whether the Z309 or earlier protocols are used.
  
  ZODB 3.8 clients can now talk to ZODB 3.9 servers and the other way
  around.
  

Changed:
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  D   ZODB/trunk/src/ZEO/ClientStub.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/ConnectionTests.py
  U   ZODB/trunk/src/ZEO/tests/forker.py
  A   ZODB/trunk/src/ZEO/tests/protocols.test
  U   ZODB/trunk/src/ZEO/tests/registerDB.test
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/tests/zeoserver.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py
  U   ZODB/trunk/src/ZODB/BaseStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/tests/testFileStorage.py

-=-
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -109,7 +109,7 @@
     TransactionBufferClass = TransactionBuffer
     ClientCacheClass = ClientCache
     ConnectionManagerClass = ConnectionManager
-    StorageServerStubClass = ServerStub.StorageServer
+    StorageServerStubClass = ServerStub.stub
 
     def __init__(self, addr, storage='1', cache_size=20 * MB,
                  name='', client=None, debug=0, var=None,
@@ -566,7 +566,8 @@
             ZODB.interfaces.IStorageCurrentRecordIteration,
             ZODB.interfaces.IBlobStorage,
             ):
-            if (iface.__module__, iface.__name__) in self._info.get('interfaces', ()):
+            if (iface.__module__, iface.__name__) in self._info.get(
+                'interfaces', ()):
                 zope.interface.alsoProvides(self, iface)
 
     def _handle_extensions(self):
@@ -1208,8 +1209,13 @@
         self._pickler = cPickle.Pickler(self._tfile, 1)
         self._pickler.fast = 1 # Don't use the memo
 
+        if self._connection.peer_protocol_version < 'Z309':
+            client = ClientStorage308Adapter(self)
+        else:
+            client = self
+
         # allow incoming invalidations:
-        self._connection.register_object(self)
+        self._connection.register_object(client)
 
         # If verify_cache() finishes the cache verification process,
         # it should set self._server.  If it goes through full cache
@@ -1279,8 +1285,8 @@
         server.endZeoVerify()
         return "full verification"
 
-    def invalidateVerify(self, args):
-        """Server callback to invalidate an (oid, '') pair.
+    def invalidateVerify(self, oid):
+        """Server callback to invalidate an oid pair.
 
         This is called as part of cache validation.
         """
@@ -1290,7 +1296,7 @@
             # This should never happen.
             logger.error("%s invalidateVerify with no _pickler", self.__name__)
             return
-        self._pickler.dump((None, [args[0]]))
+        self._pickler.dump((None, [oid]))
 
     def endVerify(self):
         """Server callback to signal end of cache validation."""
@@ -1304,10 +1310,7 @@
         try:
             if catch_up:
                 # process catch-up invalidations
-                tid, invalidations = catch_up
-                self._process_invalidations(
-                    tid, (arg[0] for arg in invalidations)
-                    )
+                self._process_invalidations(*catch_up)
             
             if self._pickler is None:
                 return
@@ -1337,7 +1340,7 @@
         self._pending_server = None
 
 
-    def invalidateTransaction(self, tid, args):
+    def invalidateTransaction(self, tid, oids):
         """Server callback: Invalidate objects modified by tid."""
         self._lock.acquire()
         try:
@@ -1345,14 +1348,13 @@
                 logger.debug(
                     "%s Transactional invalidation during cache verification",
                     self.__name__)
-                self._pickler.dump((tid, [arg[0] for arg in args]))
+                self._pickler.dump((tid, oids))
             else:
-                self._process_invalidations(tid, (arg[0] for arg in args))
+                self._process_invalidations(tid, oids)
         finally:
             self._lock.release()
 
     def _process_invalidations(self, tid, oids):
-        oids = list(oids)
         for oid in oids:
             if oid == self._load_oid:
                 self._load_status = 0
@@ -1363,8 +1365,8 @@
 
     # The following are for compatibility with protocol version 2.0.0
 
-    def invalidateTrans(self, args):
-        return self.invalidateTransaction(None, args)
+    def invalidateTrans(self, oids):
+        return self.invalidateTransaction(None, oids)
 
     invalidate = invalidateVerify
     end = endVerify
@@ -1482,3 +1484,17 @@
             self._completed = True
             raise ZODB.interfaces.StorageStopIteration()
         return ZODB.BaseStorage.DataRecord(*item)
+
+class ClientStorage308Adapter:
+
+    def __init__(self, client):
+        self.client = client
+
+    def invalidateTransaction(self, tid, args):
+        self.client.invalidateTransaction(tid, [arg[0] for arg in args])
+
+    def invalidateVerify(self, arg):
+        self.client.invalidateVerify(arg[0])
+
+    def __getattr__(self, name):
+        return getattr(self.client, name)

Deleted: ZODB/trunk/src/ZEO/ClientStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStub.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/ClientStub.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -1,77 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE
-#
-##############################################################################
-"""RPC stubs for interface exported by ClientStorage."""
-
-class ClientStorage:
-
-    """An RPC stub class for the interface exported by ClientStorage.
-
-    This is the interface presented by ClientStorage to the
-    StorageServer; i.e. the StorageServer calls these methods and they
-    are executed in the ClientStorage.
-
-    See the ClientStorage class for documentation on these methods.
-
-    It is currently important that all methods here are asynchronous
-    (meaning they don't have a return value and the caller doesn't
-    wait for them to complete), *and* that none of them cause any
-    calls from the client to the storage.  This is due to limitations
-    in the zrpc subpackage.
-
-    The on-the-wire names of some of the methods don't match the
-    Python method names.  That's because the on-the-wire protocol was
-    fixed for ZEO 2 and we don't want to change it.  There are some
-    aliases in ClientStorage.py to make up for this.
-    """
-
-    def __init__(self, rpc):
-        """Constructor.
-
-        The argument is a connection: an instance of the
-        zrpc.connection.Connection class.
-        """
-        self.rpc = rpc
-
-    def beginVerify(self):
-        self.rpc.callAsync('beginVerify')
-
-    def invalidateVerify(self, args):
-        self.rpc.callAsync('invalidateVerify', args)
-
-    def endVerify(self):
-        self.rpc.callAsync('endVerify')
-
-    def invalidateTransaction(self, tid, args):
-        self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
-
-    def serialnos(self, arg):
-        self.rpc.callAsync('serialnos', arg)
-
-    def info(self, arg):
-        self.rpc.callAsync('info', arg)
-
-    def storeBlob(self, oid, serial, blobfilename):
-
-        def store():
-            yield ('receiveBlobStart', (oid, serial))
-            f = open(blobfilename, 'rb')
-            while 1:
-                chunk = f.read(59000)
-                if not chunk:
-                    break
-                yield ('receiveBlobChunk', (oid, serial, chunk, ))
-            f.close()
-            yield ('receiveBlobStop', (oid, serial))
-
-        self.rpc.callAsyncIterator(store())

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -47,16 +47,7 @@
         zrpc.connection.Connection class.
         """
         self.rpc = rpc
-        
-        # Wait until we know what version the other side is using.
-        while rpc.peer_protocol_version is None:
-            time.sleep(0.1)
 
-        if rpc.peer_protocol_version == 'Z200':
-            self.lastTransaction = lambda: None
-            self.getInvalidations = lambda tid: None
-            self.getAuthProtocol = lambda: None
-
     def extensionMethod(self, name):
         return ExtensionMethodWrapper(self.rpc, name).call
 
@@ -111,15 +102,15 @@
         return self.rpc.call('getInvalidations', tid)
 
     ##
-    # Check whether serial numbers s and sv are current for oid.
-    # If one or both of the serial numbers are not current, the
+    # Check whether a serial number is current for oid.
+    # If the serial number is not current, the
     # server will make an asynchronous invalidateVerify() call.
     # @param oid object id
-    # @param s serial number on non-version data
+    # @param s serial number
     # @defreturn async
 
     def zeoVerify(self, oid, s):
-        self.rpc.callAsync('zeoVerify', oid, s, None)
+        self.rpc.callAsync('zeoVerify', oid, s)
 
     ##
     # Check whether current serial number is valid for oid.
@@ -130,7 +121,7 @@
     # @defreturn async
 
     def verify(self, oid, serial):
-        self.rpc.callAsync('verify', oid, '', serial)
+        self.rpc.callAsync('verify', oid, serial)
 
     ##
     # Signal to the server that cache verification is done.
@@ -183,7 +174,7 @@
     # @exception KeyError if oid is not found
 
     def loadEx(self, oid):
-        return self.rpc.call("loadEx", oid, '')[:2]
+        return self.rpc.call("loadEx", oid)
 
     ##
     # Return non-current data along with transaction ids that identify
@@ -207,12 +198,11 @@
     # @defreturn async
 
     def storea(self, oid, serial, data, id):
-        self.rpc.callAsync('storea', oid, serial, data, '', id)
+        self.rpc.callAsync('storea', oid, serial, data, id)
 
     def restorea(self, oid, serial, data, prev_txn, id):
         self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
 
-
     def storeBlob(self, oid, serial, data, blobfilename, txn):
 
         # Store a blob to the server.  We don't want to real all of
@@ -228,13 +218,12 @@
                     break
                 yield ('storeBlobChunk', (chunk, ))
             f.close()
-            yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
+            yield ('storeBlobEnd', (oid, serial, data, id(txn)))
 
         self.rpc.callAsyncIterator(store())
 
     def storeBlobShared(self, oid, serial, data, filename, id):
-        self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, 
-                           '', id)
+        self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, id)
 
     ##
     # Start two-phase commit for a transaction
@@ -262,16 +251,13 @@
 
     def history(self, oid, length=None):
         if length is None:
-            return self.rpc.call('history', oid, '')
+            return self.rpc.call('history', oid)
         else:
-            return self.rpc.call('history', oid, '', length)
+            return self.rpc.call('history', oid, length)
 
     def record_iternext(self, next):
         return self.rpc.call('record_iternext', next)
 
-    def load(self, oid):
-        return self.rpc.call('load', oid, '')
-
     def sendBlob(self, oid, serial):
         return self.rpc.call('sendBlob', oid, serial)
 
@@ -284,9 +270,6 @@
     def new_oid(self):
         return self.rpc.call('new_oid')
 
-    def store(self, oid, serial, data, trans):
-        return self.rpc.call('store', oid, serial, data, '', trans)
-
     def undo(self, trans_id, trans):
         return self.rpc.call('undo', trans_id, trans)
 
@@ -311,7 +294,91 @@
     def iterator_gc(self, iids):
         return self.rpc.callAsync('iterator_gc', iids)
 
+class StorageServer308(StorageServer):
 
+    def __init__(self, rpc):
+        if rpc.peer_protocol_version == 'Z200':
+            self.lastTransaction = lambda: None
+            self.getInvalidations = lambda tid: None
+            self.getAuthProtocol = lambda: None
+
+        StorageServer.__init__(self, rpc)
+
+    def history(self, oid, length=None):
+        if length is None:
+            return self.rpc.call('history', oid, '')
+        else:
+            return self.rpc.call('history', oid, '', length)
+
+    def getInvalidations(self, tid):
+        # Not in protocol version 2.0.0; see __init__()
+        result = self.rpc.call('getInvalidations', tid)
+        if result is not None:
+            result = result[0], [oid for (oid, version) in result[1]]
+        return result
+
+    def verify(self, oid, serial):
+        self.rpc.callAsync('verify', oid, '', serial)
+    
+    def loadEx(self, oid):
+        return self.rpc.call("loadEx", oid, '')[:2]
+
+    def storea(self, oid, serial, data, id):
+        self.rpc.callAsync('storea', oid, serial, data, '', id)
+
+    def storeBlob(self, oid, serial, data, blobfilename, txn):
+
+        # Store a blob to the server.  We don't want to real all of
+        # the data into memory, so we use a message iterator.  This
+        # allows us to read the blob data as needed.
+
+        def store():
+            yield ('storeBlobStart', ())
+            f = open(blobfilename, 'rb')
+            while 1:
+                chunk = f.read(59000)
+                if not chunk:
+                    break
+                yield ('storeBlobChunk', (chunk, ))
+            f.close()
+            yield ('storeBlobEnd', (oid, serial, data, '', id(txn)))
+
+        self.rpc.callAsyncIterator(store())
+
+    def storeBlobShared(self, oid, serial, data, filename, id):
+        self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, 
+                           '', id)
+
+    def zeoVerify(self, oid, s):
+        self.rpc.callAsync('zeoVerify', oid, s, None)
+
+    def iterator_start(self, start, stop):
+        raise NotImplementedError
+
+    def iterator_next(self, iid):
+        raise NotImplementedError
+
+    def iterator_record_start(self, txn_iid, tid):
+        raise NotImplementedError
+
+    def iterator_record_next(self, iid):
+        raise NotImplementedError
+
+    def iterator_gc(self, iids):
+        raise NotImplementedError
+
+
+def stub(client, connection):
+        
+    # Wait until we know what version the other side is using.
+    while connection.peer_protocol_version is None:
+        time.sleep(0.1)
+
+    if connection.peer_protocol_version < 'Z309':
+        return StorageServer308(connection)
+    return StorageServer(connection)
+
+
 class ExtensionMethodWrapper:
     def __init__(self, rpc, name):
         self.rpc = rpc

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -37,7 +37,6 @@
 import ZEO.zrpc.error
 
 import zope.interface
-from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
 from ZEO.zrpc.server import Dispatcher
@@ -81,9 +80,6 @@
 class ZEOStorage:
     """Proxy to underlying storage for a single remote client."""
 
-    # Classes we instantiate.  A subclass might override.
-    ClientStorageStubClass = ClientStub.ClientStorage
-
     # A list of extension methods.  A subclass with extra methods
     # should override.
     extensions = []
@@ -128,7 +124,12 @@
 
     def notifyConnected(self, conn):
         self.connection = conn # For restart_other() below
-        self.client = self.ClientStorageStubClass(conn)
+        assert conn.peer_protocol_version is not None
+        if conn.peer_protocol_version < 'Z309':
+            self.client = ClientStub308(conn)
+            conn.register_object(ZEOStorage308Adapter(self))
+        else:
+            self.client = ClientStub(conn)
         addr = conn.addr
         if isinstance(addr, type("")):
             label = addr
@@ -207,7 +208,6 @@
                 self.tpc_transaction = lambda : storage._transaction
             else:
                 raise
-                
 
     def _check_tid(self, tid, exc=None):
         if self.read_only:
@@ -288,7 +288,6 @@
                 'size': storage.getSize(),
                 'name': storage.getName(),
                 'supportsUndo': supportsUndo,
-                'supportsVersions': False,
                 'extensionMethods': self.getExtensionMethods(),
                 'supports_record_iternext': hasattr(self, 'record_iternext'),
                 'interfaces': tuple(interfaces),
@@ -302,23 +301,14 @@
     def getExtensionMethods(self):
         return self._extensions
 
-    def loadEx(self, oid, version=''):
+    def loadEx(self, oid):
         self.stats.loads += 1
-        if version:
-            raise StorageServerError("Versions aren't supported.")
+        return self.storage.load(oid, '')
 
-        data, serial = self.storage.load(oid, '')
-        return data, serial, ''
-
     def loadBefore(self, oid, tid):
         self.stats.loads += 1
         return self.storage.loadBefore(oid, tid)
 
-    def zeoLoad(self, oid):
-        self.stats.loads += 1
-        p, s = self.storage.load(oid, '')
-        return p, s, '', None, None
-
     def getInvalidations(self, tid):
         invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
         if invtid is None:
@@ -327,18 +317,16 @@
                  % (len(invlist), u64(invtid)))
         return invtid, invlist
 
-    def verify(self, oid, version, tid):
-        if version:
-            raise StorageServerError("Versions aren't supported.")
+    def verify(self, oid, tid):
         try:
             t = self.getTid(oid)
         except KeyError:
-            self.client.invalidateVerify((oid, ""))
+            self.client.invalidateVerify(oid)
         else:
             if tid != t:
-                self.client.invalidateVerify((oid, ''))
+                self.client.invalidateVerify(oid)
 
-    def zeoVerify(self, oid, s, sv=None):
+    def zeoVerify(self, oid, s):
         if not self.verifying:
             self.verifying = 1
             self.stats.verifying_clients += 1
@@ -351,11 +339,8 @@
             # invalidation is right.  It could be an application bug
             # that left a dangling reference, in which case it's bad.
         else:
-            if sv:
-                raise StorageServerError("Versions aren't supported.")
-            else:
-                if s != os:
-                    self.client.invalidateVerify((oid, ''))
+            if s != os:
+                self.client.invalidateVerify((oid, ''))
 
     def endZeoVerify(self):
         if self.verifying:
@@ -483,9 +468,7 @@
     # Most of the real implementations are in methods beginning with
     # an _.
 
-    def storea(self, oid, serial, data, version, id):
-        if version:
-            raise StorageServerError("Versions aren't supported.")
+    def storea(self, oid, serial, data, id):
         self._check_tid(id, exc=StorageTransactionError)
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data)
@@ -503,17 +486,13 @@
     def storeBlobChunk(self, chunk):
         os.write(self.blob_tempfile[0], chunk)
 
-    def storeBlobEnd(self, oid, serial, data, version, id):
-        if version:
-            raise StorageServerError("Versions aren't supported.")
+    def storeBlobEnd(self, oid, serial, data, id):
         fd, tempname = self.blob_tempfile
         self.blob_tempfile = None
         os.close(fd)
         self.blob_log.append((oid, serial, data, tempname))
 
-    def storeBlobShared(self, oid, serial, data, filename, version, id):
-        if version:
-            raise StorageServerError("Versions aren't supported.")
+    def storeBlobShared(self, oid, serial, data, filename, id):
         # Reconstruct the full path from the filename in the OID directory
         filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
                                 filename)
@@ -570,7 +549,7 @@
             newserial = [(oid, err)]
         else:
             if serial != "\0\0\0\0\0\0\0\0":
-                self.invalidated.append((oid, ''))
+                self.invalidated.append(oid)
 
             if isinstance(newserial, str):
                 newserial = [(oid, newserial)]
@@ -633,8 +612,7 @@
 
     def _undo(self, trans_id):
         tid, oids = self.storage.undo(trans_id, self.transaction)
-        inv = [(oid, None) for oid in oids]
-        self.invalidated.extend(inv)
+        self.invalidated.extend(oids)
         return tid, oids
 
     # When a delayed transaction is restarted, the dance is
@@ -723,20 +701,6 @@
         else:
             return 1
 
-    def modifiedInVersion(self, oid):
-        return ''
-
-    def versions(self):
-        return ()
-
-    def versionEmpty(self, version):
-        return True
-
-    def commitVersion(self, *a, **k):
-        raise NotImplementedError
-
-    abortVersion = commitVersion
-
     # IStorageIteration support
 
     def iterator_start(self, start, stop):
@@ -785,7 +749,6 @@
             item = (info.oid,
                     info.tid,
                     info.data,
-                    info.version,
                     info.data_txn)
         return item
 
@@ -805,10 +768,7 @@
         if version:
             raise StorageServerError("Versions aren't supported.")
         storage_id = self.storage_id
-        self.server.invalidate(
-            None, storage_id, tid,
-            [(oid, '') for oid in oids],
-            )
+        self.server.invalidate(None, storage_id, tid, oids)
         for zeo_server in self.server.connections.get(storage_id, ())[:]:
             try:
                 zeo_server.connection.poll()
@@ -1081,7 +1041,7 @@
 
         This is called from several ZEOStorage methods.
 
-        invalidated is a sequence of oid, empty-string pairs.
+        invalidated is a sequence of oids.
 
         This can do three different things:
 
@@ -1306,3 +1266,129 @@
             self.delay.error(sys.exc_info())
         else:
             self.delay.reply(result)
+
+
+class ClientStub:
+
+    def __init__(self, rpc):
+        self.rpc = rpc
+
+    def beginVerify(self):
+        self.rpc.callAsync('beginVerify')
+
+    def invalidateVerify(self, args):
+        self.rpc.callAsync('invalidateVerify', args)
+
+    def endVerify(self):
+        self.rpc.callAsync('endVerify')
+
+    def invalidateTransaction(self, tid, args):
+        self.rpc.callAsyncNoPoll('invalidateTransaction', tid, args)
+
+    def serialnos(self, arg):
+        self.rpc.callAsync('serialnos', arg)
+
+    def info(self, arg):
+        self.rpc.callAsync('info', arg)
+
+    def storeBlob(self, oid, serial, blobfilename):
+
+        def store():
+            yield ('receiveBlobStart', (oid, serial))
+            f = open(blobfilename, 'rb')
+            while 1:
+                chunk = f.read(59000)
+                if not chunk:
+                    break
+                yield ('receiveBlobChunk', (oid, serial, chunk, ))
+            f.close()
+            yield ('receiveBlobStop', (oid, serial))
+
+        self.rpc.callAsyncIterator(store())
+
+class ClientStub308(ClientStub):
+
+    def invalidateTransaction(self, tid, args):
+        self.rpc.callAsyncNoPoll(
+            'invalidateTransaction', tid, [(arg, '') for arg in args])
+
+    def invalidateVerify(self, oid):
+        self.rpc.callAsync('invalidateVerify', (oid, ''))
+
+class ZEOStorage308Adapter:
+
+    def __init__(self, storage):
+        self.storage = storage
+
+    def getSerial(self, oid):
+        return self.storage.loadEx(oid)[1] # Z200
+
+    def history(self, oid, version, size=1):
+        if version:
+            raise ValueError("Versions aren't supported.")
+        return self.storage.history(oid, size)
+
+    def getInvalidations(self, tid):
+        result = self.storage.getInvalidations(tid)
+        if result is not None:
+            result = result[0], [(oid, '') for oid in result[1]]
+        return result
+
+    def verify(self, oid, version, tid):
+        if version:
+            raise StorageServerError("Versions aren't supported.")
+        return self.storage.verify(oid, tid)
+
+    def loadEx(self, oid, version=''):
+        if version:
+            raise StorageServerError("Versions aren't supported.")
+        data, serial = self.storage.loadEx(oid)
+        return data, serial, ''
+
+    def storea(self, oid, serial, data, version, id):
+        if version:
+            raise StorageServerError("Versions aren't supported.")
+        self.storage.storea(oid, serial, data, id)
+
+    def storeBlobEnd(self, oid, serial, data, version, id):
+        if version:
+            raise StorageServerError("Versions aren't supported.")
+        self.storage.storeBlobEnd(oid, serial, data, id)
+
+    def storeBlobShared(self, oid, serial, data, filename, version, id):
+        if version:
+            raise StorageServerError("Versions aren't supported.")
+        self.storage.storeBlobShared(oid, serial, data, filename, id)
+
+    def getInfo(self):
+        result = self.storage.getInfo()
+        result['supportsVersions'] = False
+        return result
+
+    def zeoVerify(self, oid, s, sv=None):
+        if sv:
+            raise StorageServerError("Versions aren't supported.")
+        self.storage.zeoVerify(oid, s)
+
+    def modifiedInVersion(self, oid):
+        return ''
+
+    def versions(self):
+        return ()
+
+    def versionEmpty(self, version):
+        return True
+
+    def commitVersion(self, *a, **k):
+        raise NotImplementedError
+
+    abortVersion = commitVersion
+
+    def zeoLoad(self, oid):             # Z200
+        p, s = self.storage,loadEx(oid)
+        return p, s, '', None, None
+
+    def __getattr__(self, name):
+        return getattr(self.storage, name)
+
+    

Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -620,7 +620,7 @@
         perstorage = self.openClientStorage(cache="test")
         self.assertEqual(perstorage.verify_result, "quick verification")
         self.assertEqual(perstorage._server._last_invals,
-                         (revid, [(oid, '')]))
+                         (revid, [oid]))
 
         self.assertEqual(perstorage.load(oid, ''),
                          self._storage.load(oid, ''))

Modified: ZODB/trunk/src/ZEO/tests/forker.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/forker.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/tests/forker.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -86,7 +86,7 @@
 
 
 def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
-                     path='Data.fs'):
+                     path='Data.fs', protocol=None):
     """Start a ZEO server in a separate process.
 
     Takes two positional arguments a string containing the storage conf
@@ -126,6 +126,9 @@
     args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
     if keep:
         args.append("-k")
+    if protocol:
+        args.extend(["-v", protocol])
+        
     d = os.environ.copy()
     d['PYTHONPATH'] = os.pathsep.join(sys.path)
 
@@ -276,7 +279,7 @@
     servers = {}
 
     def start_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
-                     addr=None):
+                     addr=None, path='Data.fs', protocol=None):
         """Start a ZEO server.
 
         Return the server and admin addresses.
@@ -289,7 +292,7 @@
         elif addr is not None:
             raise TypeError("Can't specify port and addr")
         addr, adminaddr, pid, config_path = start_zeo_server(
-            storage_conf, zeo_conf, port, keep)
+            storage_conf, zeo_conf, port, keep, path, protocol)
         os.remove(config_path)
         servers[adminaddr] = pid
         return addr, adminaddr

Added: ZODB/trunk/src/ZEO/tests/protocols.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/protocols.test	                        (rev 0)
+++ ZODB/trunk/src/ZEO/tests/protocols.test	2008-11-25 23:54:19 UTC (rev 93355)
@@ -0,0 +1,173 @@
+Test that multiple protocols are supported
+==========================================
+
+A full test of all protocols isn't practical.  But we'll do a limited
+test that at least the current and previous protocols are supported in
+both directions.
+
+Let's start a Z308 server
+
+    >>> storage_conf = '''
+    ... <blobstorage>
+    ...    blob-dir server-blobs
+    ...    <filestorage>
+    ...       path Data.fs
+    ...    </filestorage>
+    ... </blobstorage>
+    ... '''
+
+    >>> addr, admin = start_server(
+    ...    storage_conf, dict(invalidation_queue_size=5), protocol='Z308')
+
+A current client should be able to connect to a old server:
+
+    >>> import ZEO, ZODB.blob, transaction
+    >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
+    >>> wait_connected(db.storage)
+    >>> db.storage._connection.peer_protocol_version
+    'Z308'
+
+    >>> conn = db.open()
+    >>> conn.root().x = 0
+    >>> transaction.commit()
+    >>> len(db.history(conn.root()._p_oid, 99))
+    2
+
+    >>> conn.root()['blob1'] = ZODB.blob.Blob()
+    >>> conn.root()['blob1'].open('w').write('blob data 1')
+    >>> transaction.commit()
+
+    >>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
+    >>> wait_connected(db2.storage)
+    >>> conn2 = db2.open()
+    >>> for i in range(5):
+    ...     conn2.root().x += 1
+    ...     transaction.commit()
+    >>> conn2.root()['blob2'] = ZODB.blob.Blob()
+    >>> conn2.root()['blob2'].open('w').write('blob data 2')
+    >>> transaction.commit()
+
+
+    >>> conn.sync()
+    >>> conn.root().x
+    5
+
+    >>> db.close()
+
+    >>> for i in range(2):
+    ...     conn2.root().x += 1
+    ...     transaction.commit()
+
+    >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
+    >>> wait_connected(db.storage)
+    >>> conn = db.open()
+    >>> conn.root().x
+    7
+
+    >>> db.close()
+
+    >>> for i in range(10):
+    ...     conn2.root().x += 1
+    ...     transaction.commit()
+
+    >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
+    >>> wait_connected(db.storage)
+    >>> conn = db.open()
+    >>> conn.root().x
+    17
+
+    >>> conn.root()['blob1'].open().read()
+    'blob data 1'
+    >>> conn.root()['blob2'].open().read()
+    'blob data 2'
+
+Note that when taking to a 3.8 server, iteration won't work:
+
+    >>> db.storage.iterator()
+    Traceback (most recent call last):
+    ...
+    NotImplementedError
+
+    >>> db2.close()
+    >>> db.close()
+    >>> stop_server(admin)
+
+    >>> import os, zope.testing.setupstack
+    >>> os.remove('client-1.zec')
+    >>> zope.testing.setupstack.rmtree('blobs')
+    >>> zope.testing.setupstack.rmtree('server-blobs')
+    
+And the other way around:
+
+    >>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5))
+
+Note that we'll have to pull some hijinks:
+
+    >>> import ZEO.zrpc.connection
+    >>> old_current_protocol = ZEO.zrpc.connection.Connection.current_protocol
+    >>> ZEO.zrpc.connection.Connection.current_protocol = 'Z308'
+
+    >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
+    >>> db.storage._connection.peer_protocol_version
+    'Z308'
+    >>> wait_connected(db.storage)
+    >>> conn = db.open()
+    >>> conn.root().x = 0
+    >>> transaction.commit()
+    >>> len(db.history(conn.root()._p_oid, 99))
+    2
+
+    >>> conn.root()['blob1'] = ZODB.blob.Blob()
+    >>> conn.root()['blob1'].open('w').write('blob data 1')
+    >>> transaction.commit()
+
+    >>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
+    >>> wait_connected(db2.storage)
+    >>> conn2 = db2.open()
+    >>> for i in range(5):
+    ...     conn2.root().x += 1
+    ...     transaction.commit()
+    >>> conn2.root()['blob2'] = ZODB.blob.Blob()
+    >>> conn2.root()['blob2'].open('w').write('blob data 2')
+    >>> transaction.commit()
+
+
+    >>> conn.sync()
+    >>> conn.root().x
+    5
+
+    >>> db.close()
+
+    >>> for i in range(2):
+    ...     conn2.root().x += 1
+    ...     transaction.commit()
+
+    >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
+    >>> wait_connected(db.storage)
+    >>> conn = db.open()
+    >>> conn.root().x
+    7
+
+    >>> db.close()
+
+    >>> for i in range(10):
+    ...     conn2.root().x += 1
+    ...     transaction.commit()
+
+    >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
+    >>> wait_connected(db.storage)
+    >>> conn = db.open()
+    >>> conn.root().x
+    17
+
+    >>> conn.root()['blob1'].open().read()
+    'blob data 1'
+    >>> conn.root()['blob2'].open().read()
+    'blob data 2'
+
+    >>> db2.close()
+    >>> db.close()
+
+Undo the hijinks:
+
+    >>> ZEO.zrpc.connection.Connection.current_protocol = old_current_protocol


Property changes on: ZODB/trunk/src/ZEO/tests/protocols.test
___________________________________________________________________
Added: svn:eol-style
   + native

Modified: ZODB/trunk/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/registerDB.test	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/tests/registerDB.test	2008-11-25 23:54:19 UTC (rev 93355)
@@ -9,8 +9,8 @@
 We'll create a Faux storage that has a registerDB method.
 
     >>> class FauxStorage:
-    ...     invalidations = [('trans0', [('ob0', '')]), 
-    ...                      ('trans1', [('ob0', ''), ('ob1', '')]),
+    ...     invalidations = [('trans0', ['ob0']), 
+    ...                      ('trans1', ['ob0', 'ob1']),
     ...                      ]
     ...     def registerDB(self, db):
     ...         self.db = db
@@ -85,24 +85,24 @@
 
     >>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
     invalidateTransaction trans2 1
-    [('ob1', ''), ('ob2', '')]
+    ['ob1', 'ob2']
     invalidateTransaction trans2 2
-    [('ob1', ''), ('ob2', '')]
+    ['ob1', 'ob2']
 
     >>> storage.db.invalidate('trans3', ['ob1', 'ob2'])
     invalidateTransaction trans3 1
-    [('ob1', ''), ('ob2', '')]
+    ['ob1', 'ob2']
     invalidateTransaction trans3 2
-    [('ob1', ''), ('ob2', '')]
+    ['ob1', 'ob2']
 
 The storage servers queue will reflect the invalidations:
 
     >>> for tid, invalidated in server.invq['t']:
     ...     print repr(tid), invalidated
-    'trans3' [('ob1', ''), ('ob2', '')]
-    'trans2' [('ob1', ''), ('ob2', '')]
-    'trans1' [('ob0', ''), ('ob1', '')]
-    'trans0' [('ob0', '')]
+    'trans3' ['ob1', 'ob2']
+    'trans2' ['ob1', 'ob2']
+    'trans1' ['ob0', 'ob1']
+    'trans0' ['ob0']
 
 If we call invalidateCache, the storage server will close each of it's
 connections:
@@ -117,5 +117,5 @@
 
     >>> for tid, invalidated in server.invq['t']:
     ...     print repr(tid), invalidated
-    'trans1' [('ob0', ''), ('ob1', '')]
-    'trans0' [('ob0', '')]
+    'trans1' ['ob0', 'ob1']
+    'trans0' ['ob0']

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -818,7 +818,7 @@
         return result
 
     def store(self, oid, serial, data, version_ignored, transaction):
-        self.server.storea(oid, serial, data, '', id(transaction))
+        self.server.storea(oid, serial, data, id(transaction))
 
     def tpc_finish(self, transaction, func = lambda: None):
         self.server.tpc_finish(id(transaction))
@@ -859,7 +859,7 @@
     
     >>> trans, oids = s1.getInvalidations(last)
     >>> from ZODB.utils import u64
-    >>> sorted([int(u64(oid)) for (oid, v) in oids])
+    >>> sorted([int(u64(oid)) for oid in oids])
     [10, 11, 12, 13, 14]
     
     >>> server.close_server()
@@ -913,7 +913,7 @@
 
 
     >>> from ZODB.utils import u64
-    >>> sorted([int(u64(oid)) for (oid, _) in oids])
+    >>> sorted([int(u64(oid)) for oid in oids])
     [0, 92, 93, 94, 95, 96, 97, 98, 99, 100]
 
 (Note that the fact that we get oids for 92-100 is actually an
@@ -961,7 +961,7 @@
     >>> ntid == last[-1]
     True
 
-    >>> sorted([int(u64(oid)) for (oid, _) in oids])
+    >>> sorted([int(u64(oid)) for oid in oids])
     [0, 101, 102, 103, 104]
 
     >>> fs.close()
@@ -970,9 +970,11 @@
 def tpc_finish_error():
     r"""Server errors in tpc_finish weren't handled properly.
 
-    >>> import ZEO.ClientStorage
+    >>> import ZEO.ClientStorage, ZEO.zrpc.connection
 
     >>> class Connection:
+    ...     peer_protocol_version = (
+    ...         ZEO.zrpc.connection.Connection.current_protocol)
     ...     def __init__(self, client):
     ...         self.client = client
     ...     def get_addr(self):
@@ -1127,6 +1129,22 @@
     
     """
 
+def history_over_zeo():
+    """
+    >>> addr, _ = start_server()
+    >>> import ZEO, ZODB.blob, transaction
+    >>> db = ZEO.DB(addr)
+    >>> wait_connected(db.storage)
+    >>> conn = db.open()
+    >>> conn.root().x = 0
+    >>> transaction.commit()
+    >>> len(db.history(conn.root()._p_oid, 99))
+    2
+
+    >>> db.close()
+    """
+
+
 slow_test_classes = [
     BlobAdaptedFileStorageTests, BlobWritableCacheTests,
     DemoStorageTests, FileStorageTests, MappingStorageTests,
@@ -1150,6 +1168,7 @@
         doctest.DocFileSuite(
             'zeo-fan-out.test', 'zdoptions.test',
             'drop_cache_rather_than_verify.txt',
+            'protocols.test',
             setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
             ),
         )

Modified: ZODB/trunk/src/ZEO/tests/zeoserver.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeoserver.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/tests/zeoserver.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -13,6 +13,9 @@
 ##############################################################################
 """Helper file used to launch a ZEO server cross platform"""
 
+from ZEO.StorageServer import StorageServer
+from ZEO.runzeo import ZEOOptions
+
 import asyncore
 import os
 import sys
@@ -25,10 +28,6 @@
 import threading
 import logging
 
-from ZEO.StorageServer import StorageServer
-from ZEO.runzeo import ZEOOptions
-
-
 def cleanup(storage):
     # FileStorage and the Berkeley storages have this method, which deletes
     # all files and directories used by the storage.  This prevents @-files
@@ -155,12 +154,15 @@
     keep = 0
     configfile = None
     # Parse the arguments and let getopt.error percolate
-    opts, args = getopt.getopt(sys.argv[1:], 'kC:')
+    opts, args = getopt.getopt(sys.argv[1:], 'kC:v:')
     for opt, arg in opts:
         if opt == '-k':
             keep = 1
         elif opt == '-C':
             configfile = arg
+        elif opt == '-v':
+            import ZEO.zrpc.connection
+            ZEO.zrpc.connection.Connection.current_protocol = arg
 
     zo = ZEOOptions()
     zo.realize(["-C", configfile])

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -350,11 +350,11 @@
 
     # If we're a client, an exhaustive list of the server protocols we
     # can accept.
-    servers_we_can_talk_to = [current_protocol]
+    servers_we_can_talk_to = ["Z308", current_protocol]
 
     # If we're a server, an exhaustive list of the client protocols we
     # can accept.
-    clients_we_can_talk_to = ["Z200", "Z201", "Z303", current_protocol]
+    clients_we_can_talk_to = ["Z200", "Z201", "Z303", "Z308", current_protocol]
 
     # This is pretty excruciating.  Details:
     #
@@ -779,24 +779,25 @@
         
 class ManagedServerConnection(Connection):
     """Server-side Connection subclass."""
-    __super_init = Connection.__init__
-    __super_close = Connection.close
 
     # Servers use a shared server trigger that uses the asyncore socket map
     trigger = trigger()
 
     def __init__(self, sock, addr, obj, mgr):
         self.mgr = mgr
-        self.__super_init(sock, addr, obj, 'S')
-        self.obj.notifyConnected(self)
+        Connection.__init__(self, sock, addr, obj, 'S')
 
     def handshake(self):
         # Send the server's preferred protocol to the client.
         self.message_output(self.current_protocol)
 
+    def recv_handshake(self, proto):
+        Connection.recv_handshake(self, proto)
+        self.obj.notifyConnected(self)
+
     def close(self):
         self.obj.notifyDisconnected()
-        self.__super_close()
+        Connection.close(self)
 
 class ManagedClientConnection(Connection):
     """Client-side Connection subclass."""

Modified: ZODB/trunk/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/BaseStorage.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZODB/BaseStorage.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -425,8 +425,7 @@
 
     version = ''
 
-    def __init__(self, oid, tid, data, version, prev):
-        assert not version, "versions are no-longer supported"
+    def __init__(self, oid, tid, data, prev):
         self.oid = oid
         self.tid = tid
         self.data = data

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -1090,7 +1090,7 @@
                 pos = pos - 8 - u64(read(8))
 
             seek(0)
-            return [(trans.tid, [(r.oid, '') for r in trans])
+            return [(trans.tid, [r.oid for r in trans])
                     for trans in FileIterator(self._file_name, pos=pos)]
         finally:
             self._lock_release()
@@ -1711,7 +1711,7 @@
 class Record(BaseStorage.DataRecord):
 
     def __init__(self, oid, tid, data, prev, pos):
-        super(Record, self).__init__(oid, tid, data, '', prev)
+        super(Record, self).__init__(oid, tid, data, prev)
         self.pos = pos
 
 

Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -332,7 +332,7 @@
 
     def __iter__(self):
         for oid, data in self.data.items():
-            yield DataRecord(oid, self.tid, data, None)
+            yield DataRecord(oid, self.tid, data)
 
     def pack(self, oid):
         self.status = 'p'
@@ -345,12 +345,12 @@
     zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
 
     version = ''
+    data_txn = None
 
-    def __init__(self, oid, tid, data, prev):
+    def __init__(self, oid, tid, data):
         self.oid = oid
         self.tid = tid
         self.data = data
-        self.data_txn = prev
 
 def DB(*args, **kw):
     return ZODB.DB(MappingStorage(), *args, **kw)

Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-11-25 22:24:15 UTC (rev 93354)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-11-25 23:54:19 UTC (rev 93355)
@@ -471,7 +471,7 @@
     True
 
     >>> from ZODB.utils import u64
-    >>> [[int(u64(oid)) for (oid, version) in oids]
+    >>> [[int(u64(oid)) for oid in oids]
     ...  for (i, oids) in invalidations]
     ... # doctest: +NORMALIZE_WHITESPACE
     [[0, 91], [0, 92], [0, 93], [0, 94], [0, 95],



More information about the Zodb-checkins mailing list