[Zodb-checkins] CVS: Zope3/src/zodb/zeo - stubs.py:1.4 server.py:1.6 client.py:1.4

Jeremy Hylton jeremy@zope.com
Wed, 5 Feb 2003 18:28:59 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv16759/src/zodb/zeo

Modified Files:
	stubs.py server.py client.py 
Log Message:
Merge storage-interface-branch to trunk.

Rename methods that had underscores to use camel case.
    new_oid => newObjectId
    tpc_begin => tpcBegin
    tpc_vote => tpcVote
    tpc_finish => tpcFinish
    tpc_abort => tpcAbort
    transactionalUndo => undo

Remove some methods from storage interfaces.
Move storage-related exceptions to zodb.storage.interfaces.
Add __implements__ to storages and use for introspection in place of
supportsXXX().


=== Zope3/src/zodb/zeo/stubs.py 1.3 => 1.4 ===
--- Zope3/src/zodb/zeo/stubs.py:1.3	Fri Jan 24 18:20:49 2003
+++ Zope3/src/zodb/zeo/stubs.py	Wed Feb  5 18:28:24 2003
@@ -112,12 +112,6 @@
     def endZeoVerify(self):
         self.rpc.callAsync('endZeoVerify')
 
-    def new_oids(self, n=None):
-        if n is None:
-            return self.rpc.call('new_oids')
-        else:
-            return self.rpc.call('new_oids', n)
-
     def pack(self, t, wait=None):
         if wait is None:
             self.rpc.call('pack', t)
@@ -130,17 +124,17 @@
     def storea(self, oid, serial, data, version, id):
         self.rpc.callAsync('storea', oid, serial, data, version, id)
 
-    def tpc_begin(self, id, user, descr, ext, tid, status):
-        return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
+    def tpcBegin(self, id, user, descr, ext, tid, status):
+        return self.rpc.call('tpcBegin', id, user, descr, ext, tid, status)
 
-    def vote(self, trans_id):
-        return self.rpc.call('vote', trans_id)
+    def tpcVote(self, trans_id):
+        return self.rpc.call('tpcVote', trans_id)
 
-    def tpc_finish(self, id):
-        return self.rpc.call('tpc_finish', id)
+    def tpcFinish(self, id):
+        return self.rpc.call('tpcFinish', id)
 
-    def tpc_abort(self, id):
-        self.rpc.callAsync('tpc_abort', id)
+    def tpcAbort(self, id):
+        self.rpc.callAsync('tpcAbort', id)
 
     def abortVersion(self, src, id):
         return self.rpc.call('abortVersion', src, id)
@@ -163,7 +157,13 @@
     def modifiedInVersion(self, oid):
         return self.rpc.call('modifiedInVersion', oid)
 
-    def new_oid(self, last=None):
+    def newObjectIds(self, n=None):
+        if n is None:
+            return self.rpc.call('newObjectIds')
+        else:
+            return self.rpc.call('newObjectIds', n)
+
+    def newObjectId(self, last=None):
         if last is None:
             return self.rpc.call('new_oid')
         else:
@@ -172,11 +172,8 @@
     def store(self, oid, serial, data, version, trans):
         return self.rpc.call('store', oid, serial, data, version, trans)
 
-    def transactionalUndo(self, trans_id, trans):
-        return self.rpc.call('transactionalUndo', trans_id, trans)
-
-    def undo(self, trans_id):
-        return self.rpc.call('undo', trans_id)
+    def undo(self, trans_id, trans):
+        return self.rpc.call('undo', trans_id, trans)
 
     def undoLog(self, first, last):
         return self.rpc.call('undoLog', first, last)


=== Zope3/src/zodb/zeo/server.py 1.5 => 1.6 ===
--- Zope3/src/zodb/zeo/server.py:1.5	Tue Feb  4 11:05:19 2003
+++ Zope3/src/zodb/zeo/server.py	Wed Feb  5 18:28:24 2003
@@ -34,8 +34,10 @@
 from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay
 
 from zodb.ztransaction import Transaction
-from zodb.interfaces import StorageError, StorageTransactionError
-from zodb.interfaces import TransactionError, ReadOnlyError
+from zodb.interfaces import TransactionError
+from zodb.storage.interfaces import *
+
+from zope.interface.implements import objectImplements
 
 class StorageServerError(StorageError):
     """Error reported when an unpickleable exception is raised."""
@@ -301,11 +303,10 @@
         self.server.register_connection(storage_id, self)
 
     def get_info(self):
-        return {'name': self.storage.getName(),
-                'supportsVersions': self.storage.supportsVersions(),
-                'supportsTransactionalUndo':
-                self.storage.supportsTransactionalUndo(),
-                'extensionMethods': self.getExtensionMethods(),
+        return {"name": self.storage.getName(),
+                "implements": [iface.__name__
+                               for iface in objectImplements(self.storage)],
+                "extensionMethods": self.getExtensionMethods(),
                 }
 
     def getExtensionMethods(self):
@@ -386,13 +387,13 @@
         # Broadcast new size statistics
         self.server.invalidate(0, self.storage_id, ())
 
-    def new_oids(self, n=100):
+    def newObjectIds(self, n=100):
         """Return a sequence of n new oids, where n defaults to 100"""
         if self.read_only:
             raise ReadOnlyError()
         if n <= 0:
             n = 1
-        return [self.storage.new_oid() for i in range(n)]
+        return [self.storage.newObjectId() for i in range(n)]
 
     def undo(self, transaction_id):
         if self.read_only:
@@ -412,7 +413,7 @@
     def undoLog(self, first, last):
         return run_in_thread(self.storage.undoLog, first, last)
 
-    def tpc_begin(self, id, user, description, ext, tid, status):
+    def tpcBegin(self, id, user, description, ext, tid, status):
         if self.read_only:
             raise ReadOnlyError()
         if self.transaction is not None:
@@ -434,13 +435,13 @@
         t.description = description
         t._extension = ext
 
-        self.strategy.tpc_begin(t, tid, status)
+        self.strategy.tpcBegin(t, tid, status)
         self.transaction = t
 
-    def tpc_finish(self, id):
+    def tpcFinish(self, id):
         if not self.check_tid(id):
             return
-        invalidated = self.strategy.tpc_finish()
+        invalidated = self.strategy.tpcFinish()
         if invalidated:
             self.server.invalidate(self, self.storage_id,
                                    invalidated)
@@ -448,11 +449,11 @@
         self.strategy = None
         self.handle_waiting()
 
-    def tpc_abort(self, id):
+    def tpcAbort(self, id):
         if not self.check_tid(id):
             return
         strategy = self.strategy
-        strategy.tpc_abort()
+        strategy.tpcAbort()
         self.transaction = None
         self.strategy = None
         self.handle_waiting()
@@ -469,9 +470,9 @@
         self.check_tid(id, exc=StorageTransactionError)
         self.strategy.store(oid, serial, data, version)
 
-    def vote(self, id):
+    def tpcVote(self, id):
         self.check_tid(id, exc=StorageTransactionError)
-        return self.strategy.tpc_vote()
+        return self.strategy.tpcVote()
 
     def abortVersion(self, src, id):
         self.check_tid(id, exc=StorageTransactionError)
@@ -481,9 +482,9 @@
         self.check_tid(id, exc=StorageTransactionError)
         return self.strategy.commitVersion(src, dest)
 
-    def transactionalUndo(self, trans_id, id):
+    def undo(self, trans_id, id):
         self.check_tid(id, exc=StorageTransactionError)
-        return self.strategy.transactionalUndo(trans_id)
+        return self.strategy.undo(trans_id)
 
     # When a delayed transaction is restarted, the dance is
     # complicated.  The restart occurs when one ZEOStorage instance
@@ -575,7 +576,7 @@
     # This isn't a proper Zope interface, because I don't want to
     # introduce a dependency between ZODB and Zope interfaces.
 
-    def tpc_begin(self, trans, tid, status): pass
+    def tpcBegin(self, trans, tid, status): pass
 
     def store(self, oid, serial, data, version): pass
 
@@ -584,13 +585,13 @@
     def commitVersion(self, src, dest): pass
 
     # the trans_id arg to transactionalUndo is not the current txn's id
-    def transactionalUndo(self, trans_id): pass
+    def undo(self, trans_id): pass
 
-    def tpc_vote(self): pass
+    def tpcVote(self): pass
 
-    def tpc_abort(self): pass
+    def tpcAbort(self): pass
 
-    def tpc_finish(self): pass
+    def tpcFinish(self): pass
 
     # What to do if a connection is closed in mid-transaction
     def abort(self, zeo_storage): pass
@@ -606,21 +607,21 @@
         self.logger = logger
         self.log_label = log_label
 
-    def tpc_begin(self, txn, tid, status):
+    def tpcBegin(self, txn, tid, status):
         self.txn = txn
-        self.storage.tpc_begin(txn, tid, status)
+        self.storage.tpcBegin(txn, tid, status)
 
-    def tpc_vote(self):
+    def tpcVote(self):
         # send all the serialnos as a batch
         self.client.serialnos(self.serials)
-        return self.storage.tpc_vote(self.txn)
+        return self.storage.tpcVote(self.txn)
 
-    def tpc_finish(self):
-        self.storage.tpc_finish(self.txn)
+    def tpcFinish(self):
+        self.storage.tpcFinish(self.txn)
         return self.invalidated
 
-    def tpc_abort(self):
-        self.storage.tpc_abort(self.txn)
+    def tpcAbort(self):
+        self.storage.tpcAbort(self.txn)
 
     def store(self, oid, serial, data, version):
         try:
@@ -666,14 +667,14 @@
         self.invalidated.extend(inv)
         return oids
 
-    def transactionalUndo(self, trans_id):
-        oids = self.storage.transactionalUndo(trans_id, self.txn)
+    def undo(self, trans_id):
+        oids = self.storage.undo(trans_id, self.txn)
         inv = [(oid, None) for oid in oids]
         self.invalidated.extend(inv)
         return oids
 
     def abort(self, zeo_storage):
-        self.tpc_abort()
+        self.tpcAbort()
         zeo_storage.handle_waiting()
 
 class DelayedCommitStrategy:
@@ -689,7 +690,7 @@
         self.name = None
         self.args = None
 
-    def tpc_begin(self, txn, tid, status):
+    def tpcBegin(self, txn, tid, status):
         self.txn = txn
         self.tid = tid
         self.status = status
@@ -697,19 +698,19 @@
     def store(self, oid, serial, data, version):
         self.log.store(oid, serial, data, version)
 
-    def tpc_abort(self):
+    def tpcAbort(self):
         pass # just forget about this strategy
 
-    def tpc_finish(self):
-        # There has to be a tpc_vote() call before tpc_finish() is
+    def tpcFinish(self):
+        # There has to be a tpcVote() call before tpcFinish() is
         # called, and tpc_vote() always blocks, so a proper
-        # tpc_finish() call will always be sent to the immediate
+        # tpcFinish() call will always be sent to the immediate
         # commit strategy object.  So, if we get here, it means no
-        # call to tpc_vote() was made, which is a bug in the caller.
+        # call to tpcVote() was made, which is a bug in the caller.
         raise RuntimeError, "Logic error.  This method must not be called."
 
-    def tpc_vote(self):
-        self.name = "tpc_vote"
+    def tpcVote(self):
+        self.name = "tpcVote"
         self.args = ()
         return self.block()
 
@@ -723,15 +724,15 @@
         self.args = src,
         return self.block()
 
-    def transactionalUndo(self, trans_id):
-        self.name = "transactionalUndo"
+    def undo(self, trans_id):
+        self.name = "undo"
         self.args = trans_id,
         return self.block()
 
     def restart(self, new_strategy):
         # called by the storage when the storage is available
         assert isinstance(new_strategy, ImmediateCommitStrategy)
-        new_strategy.tpc_begin(self.txn, self.tid, self.status)
+        new_strategy.tpcBegin(self.txn, self.tid, self.status)
         loads, loader = self.log.get_loader()
         for i in range(loads):
             oid, serial, data, version = loader.load()


=== Zope3/src/zodb/zeo/client.py 1.3 => 1.4 ===
--- Zope3/src/zodb/zeo/client.py:1.3	Fri Jan 24 18:20:49 2003
+++ Zope3/src/zodb/zeo/client.py	Wed Feb  5 18:28:24 2003
@@ -41,7 +41,7 @@
 from zodb.zeo.tbuf import TransactionBuffer
 from zodb.zeo.zrpc.client import ConnectionManager
 
-from zodb import interfaces
+from zodb.storage.interfaces import *
 from zodb.timestamp import TimeStamp
 
 try:
@@ -49,7 +49,7 @@
 except ImportError:
     ResolvedSerial = 'rs'
 
-class ClientStorageError(interfaces.StorageError):
+class ClientStorageError(StorageError):
     """An error occured in the ZEO Client Storage."""
 
 class UnrecognizedResult(ClientStorageError):
@@ -95,7 +95,7 @@
     This is a faithful implementation of the Storage API.
 
     This class is thread-safe; transactions are serialized in
-    tpc_begin().
+    tpcBegin().
     """
 
     # Classes we instantiate.  A subclass might override.
@@ -105,6 +105,13 @@
     ConnectionManagerClass = ConnectionManager
     StorageServerStubClass = stubs.StorageServer
 
+    # The exact storage interfaces depend on the server that the client
+    # connects to.  We know that every storage must implement IStorage,
+    # but once connected we may change the instance's __implements__
+    # to reflect features available on the storage.
+    
+    __implements__ = IStorage
+
     def __init__(self, addr, storage='1', cache_size=20 * MB,
                  name='', client=None, debug=0, var=None,
                  min_disconnect_poll=5, max_disconnect_poll=300,
@@ -210,10 +217,8 @@
         # _server_addr is used by sortKey()
         self._server_addr = None
 
-        self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
-                      'supportsUndo':0, 'supportsVersions': 0,
-                      'supportsTransactionalUndo': 0}
-
+        self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client'}
+        
         self._tbuf = self.TransactionBufferClass()
         self._db = None
 
@@ -239,7 +244,7 @@
         # variable should only be modified while holding the
         # _oid_lock.
         self._oid_lock = threading.Lock()
-        self._oids = [] # Object ids retrieved from new_oids()
+        self._oids = [] # Object ids retrieved from newObjectIds()
 
         t = self._ts = get_timestamp()
         self._serial = `t`
@@ -325,7 +330,7 @@
         try:
             stub.register(str(self._storage), self._is_read_only)
             return 1
-        except interfaces.ReadOnlyError:
+        except ReadOnlyError:
             if not self._read_only_fallback:
                 raise
             self.logger.warn(
@@ -347,6 +352,7 @@
         stub = self.StorageServerStubClass(conn)
         self._oids = []
         self._info.update(stub.get_info())
+        self.update_interfaces()
         self.verify_cache(stub)
 
         # XXX The stub should be saved here and set in endVerify() below.
@@ -355,6 +361,16 @@
         self._connection = conn
         self._server = stub
 
+    def update_interfaces(self):
+        # Update instance's __implements__ based on the server.
+        L = [IStorage]
+        for name in self._info.get("implements", ()):
+            if name == "IUndoStorage":
+                L.append(IUndoStorage)
+            elif name == "IVersionStorage":
+                L.append(IVersionStorage)
+        self.__implements__ = tuple(L)
+
     def set_server_addr(self, addr):
         # Normalize server address and convert to string
         if isinstance(addr, types.StringType):
@@ -433,18 +449,6 @@
         """
         return self._info['extensionMethods']
 
-    def supportsUndo(self):
-        """Storage API: return whether we support undo."""
-        return self._info['supportsUndo']
-
-    def supportsVersions(self):
-        """Storage API: return whether we support versions."""
-        return self._info['supportsVersions']
-
-    def supportsTransactionalUndo(self):
-        """Storage API: return whether we support transactional undo."""
-        return self._info['supportsTransactionalUndo']
-
     def isReadOnly(self):
         """Storage API: return whether we are in read-only mode.
 
@@ -464,10 +468,9 @@
     def _check_trans(self, trans):
         """Internal helper to check a transaction argument for sanity."""
         if self._is_read_only:
-            raise interfaces.ReadOnlyError()
+            raise ReadOnlyError()
         if self._transaction is not trans:
-            raise interfaces.StorageTransactionError(self._transaction,
-                                                       trans)
+            raise StorageTransactionError(self._transaction, trans)
 
     def abortVersion(self, version, transaction):
         """Storage API: clear any changes made by the given version."""
@@ -550,15 +553,15 @@
             return v
         return self._server.modifiedInVersion(oid)
 
-    def new_oid(self):
+    def newObjectId(self):
         """Storage API: return a new object identifier."""
         if self._is_read_only:
-            raise interfaces.ReadOnlyError()
+            raise ReadOnlyError()
         # avoid multiple oid requests to server at the same time
         self._oid_lock.acquire()
         try:
             if not self._oids:
-                self._oids = self._server.new_oids()
+                self._oids = self._server.newObjectIds()
                 self._oids.reverse()
             return self._oids.pop()
         finally:
@@ -603,17 +606,17 @@
         self._tbuf.store(oid, version, data)
         return self._check_serials()
 
-    def tpc_vote(self, transaction):
+    def tpcVote(self, transaction):
         """Storage API: vote on a transaction."""
         if transaction is not self._transaction:
             return
-        self._server.vote(self._serial)
+        self._server.tpcVote(self._serial)
         return self._check_serials()
 
-    def tpc_begin(self, txn, tid=None, status=' '):
+    def tpcBegin(self, txn, tid=None, status=' '):
         """Storage API: begin a transaction."""
         if self._is_read_only:
-            raise interfaces.ReadOnlyError()
+            raise ReadOnlyError()
         self._tpc_cond.acquire()
         while self._transaction is not None:
             # It is allowable for a client to call two tpc_begins in a
@@ -634,7 +637,7 @@
             id = tid
 
         try:
-            self._server.tpc_begin(id, txn.user, txn.description,
+            self._server.tpcBegin(id, txn.user, txn.description,
                                    txn._extension, tid, status)
         except:
             # Client may have disconnected during the tpc_begin().
@@ -659,19 +662,19 @@
     def lastTransaction(self):
         return self._ltid
 
-    def tpc_abort(self, transaction):
+    def tpcAbort(self, transaction):
         """Storage API: abort a transaction."""
         if transaction is not self._transaction:
             return
         try:
-            self._server.tpc_abort(self._serial)
+            self._server.tpcAbort(self._serial)
             self._tbuf.clear()
             self._seriald.clear()
             del self._serials[:]
         finally:
             self.end_transaction()
 
-    def tpc_finish(self, transaction, f=None):
+    def tpcFinish(self, transaction, f=None):
         """Storage API: finish a transaction."""
         if transaction is not self._transaction:
             return
@@ -679,7 +682,7 @@
             if f is not None:
                 f()
 
-            self._server.tpc_finish(self._serial)
+            self._server.tpcFinish(self._serial)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
@@ -721,7 +724,7 @@
                 self._cache.update(oid, s, v, p)
         self._tbuf.clear()
 
-    def transactionalUndo(self, trans_id, trans):
+    def undo(self, trans_id, trans):
         """Storage API: undo a transaction.
 
         This is executed in a transactional context.  It has no effect
@@ -731,19 +734,9 @@
         a storage.
         """
         self._check_trans(trans)
-        oids = self._server.transactionalUndo(trans_id, self._serial)
+        oids = self._server.undo(trans_id, self._serial)
         for oid in oids:
             self._tbuf.invalidate(oid, '')
-        return oids
-
-    def undo(self, transaction_id):
-        """Storage API: undo a transaction, writing directly to the storage."""
-        if self._is_read_only:
-            raise interfaces.ReadOnlyError()
-        # XXX what are the sync issues here?
-        oids = self._server.undo(transaction_id)
-        for oid in oids:
-            self._cache.invalidate(oid, '')
         return oids
 
     def undoInfo(self, first=0, last=-20, specification=None):