[Zodb-checkins] SVN: ZODB/branches/jim-thready/src/Z checkpoint

Jim Fulton jim at zope.com
Fri Jan 15 14:39:11 EST 2010


Log message for revision 108159:
  checkpoint

Changed:
  U   ZODB/branches/jim-thready/src/ZEO/ClientStorage.py
  U   ZODB/branches/jim-thready/src/ZEO/CommitLog.py
  U   ZODB/branches/jim-thready/src/ZEO/ServerStub.py
  U   ZODB/branches/jim-thready/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-thready/src/ZEO/tests/CommitLockTests.py
  U   ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py
  U   ZODB/branches/jim-thready/src/ZEO/zrpc/trigger.py
  U   ZODB/branches/jim-thready/src/ZODB/Connection.py
  U   ZODB/branches/jim-thready/src/ZODB/tests/RevisionStorage.py
  U   ZODB/branches/jim-thready/src/ZODB/tests/StorageTestBase.py

-=-
Modified: ZODB/branches/jim-thready/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/ClientStorage.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/ClientStorage.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -1194,11 +1194,13 @@
         if self._cache is None:
             return
 
-        for oid, data in self._tbuf:
+        for oid, tid in self._seriald.iteritems():
             self._cache.invalidate(oid, tid, False)
+
+        for oid, data in self._tbuf:
+            s = self._seriald[oid] # assigning here asserts that oid in seriald
             # If data is None, we just invalidate.
             if data is not None:
-                s = self._seriald[oid]
                 if s != ResolvedSerial:
                     assert s == tid, (s, tid)
                     self._cache.store(oid, s, None, data)
@@ -1237,10 +1239,7 @@
 
         """
         self._check_trans(txn)
-        tid, oids = self._server.undo(trans_id, id(txn))
-        for oid in oids:
-            self._tbuf.invalidate(oid)
-        return tid, oids
+        self._server.undoa(trans_id, id(txn))
 
     def undoInfo(self, first=0, last=-20, specification=None):
         """Storage API: return undo information."""

Modified: ZODB/branches/jim-thready/src/ZEO/CommitLog.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/CommitLog.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/CommitLog.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -32,27 +32,31 @@
         self.pickler = cPickle.Pickler(self.file, 1)
         self.pickler.fast = 1
         self.stores = 0
-        self.read = 0
 
     def size(self):
         return self.file.tell()
 
     def delete(self, oid, serial):
-        self.pickler.dump(('d', oid, serial))
+        self.pickler.dump(('_delete', (oid, serial)))
         self.stores += 1
 
     def store(self, oid, serial, data):
-        self.pickler.dump(('s', oid, serial, data))
+        self.pickler.dump(('_store', (oid, serial, data)))
         self.stores += 1
 
     def restore(self, oid, serial, data, prev_txn):
-        self.pickler.dump(('r', oid, serial, data, prev_txn))
+        self.pickler.dump(('_restore', (oid, serial, data, prev_txn)))
         self.stores += 1
 
-    def get_loader(self):
-        self.read = 1
+    def undo(self, transaction_id):
+        self.pickler.dump(('_undo', (transaction_id, )))
+        self.stores += 1
+
+    def __iter__(self):
         self.file.seek(0)
-        return self.stores, cPickle.Unpickler(self.file)
+        unpickler = cPickle.Unpickler(self.file)
+        for i in range(self.stores):
+            yield unpickler.load()
 
     def close(self):
         if self.file:

Modified: ZODB/branches/jim-thready/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/ServerStub.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/ServerStub.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -272,8 +272,8 @@
     def new_oid(self):
         return self.rpc.call('new_oid')
 
-    def undo(self, trans_id, trans):
-        return self.rpc.call('undo', trans_id, trans)
+    def undoa(self, trans_id, trans):
+        self.rpc.callAsync('undoa', trans_id, trans)
 
     def undoLog(self, first, last):
         return self.rpc.call('undoLog', first, last)

Modified: ZODB/branches/jim-thready/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/StorageServer.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/StorageServer.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -20,6 +20,8 @@
 exported for invocation by the server.
 """
 
+from __future__ import with_statement
+
 import asyncore
 import cPickle
 import logging
@@ -96,7 +98,6 @@
         self.transaction = None
         self.read_only = read_only
         self.locked = False             # Don't have storage lock
-        self.locked_lock = threading.Lock() # mediate locked access
         self.verifying = 0
         self.store_failed = 0
         self.log_label = _label
@@ -138,31 +139,8 @@
             label = str(host) + ":" + str(port)
         self.log_label = _label + "/" + label
 
-    def notifyLocked(self):
-        # We don't want to give a lock to a disconnected client and, we
-        # need to avoid a race of giving a lock to a client while it's
-        # disconecting. We check self.connection and set self.locked while
-        # the locked_lock is held, preventing self.connection from being
-        # set to None between the check and setting self.lock.
-        self.locked_lock.acquire()
-        try:
-            if self.connection is None:
-                return False # We've been disconnected. Don't take the lock
-            self.locked = True
-            # What happens if, before processing the trigger we, disconnect,
-            # reconnect, and start a new transaction?
-            # This isn't possible because we never reconnect!
-            self.connection.trigger.pull_trigger(self._restart)
-            return True
-        finally:
-            self.locked_lock.release()
-
     def notifyDisconnected(self):
-        self.locked_lock.acquire()
-        try:
-            self.connection = None
-        finally:
-            self.locked_lock.release()
+        self.connection = None
 
         # When this storage closes, we must ensure that it aborts
         # any pending transaction.
@@ -188,6 +166,7 @@
     def setup_delegation(self):
         """Delegate several methods to the storage
         """
+        # Called from register
 
         storage = self.storage
 
@@ -195,9 +174,6 @@
 
         if not info['supportsUndo']:
             self.undoLog = self.undoInfo = lambda *a,**k: ()
-            def undo(*a, **k):
-                raise NotImplementedError
-            self.undo = undo
 
         self.getTid = storage.getTid
         self.load = storage.load
@@ -300,13 +276,10 @@
     def get_info(self):
         storage = self.storage
 
-        try:
-            supportsUndo = storage.supportsUndo
-        except AttributeError:
-            supportsUndo = False
-        else:
-            supportsUndo = supportsUndo()
 
+        supportsUndo = (getattr(storage, 'supportsUndo', lambda : False)()
+                        and self.connection.peer_protocol_version >= 'Z310')
+
         # Communicate the backend storage interfaces to the client
         storage_provides = zope.interface.providedBy(storage)
         interfaces = []
@@ -484,8 +457,9 @@
 
     def _clear_transaction(self):
         # Common code at end of tpc_finish() and tpc_abort()
-        self.server.unlock_storage(self)
-        self.locked = 0
+        if self.locked:
+            self.server.unlock_storage(self)
+            self.locked = 0
         self.transaction = None
         self.stats.active_txns -= 1
         if self.txnlog is not None:
@@ -495,49 +469,42 @@
                 ZODB.blob.remove_committed(blobfilename)
             del self.blob_log
 
-    # The following two methods return values, so they must acquire
-    # the storage lock and begin the transaction before returning.
-
-    # It's a bit vile that undo can cause us to get the lock before vote.
-
-    def undo(self, trans_id, tid):
-        self._check_tid(tid, exc=StorageTransactionError)
-
-        if self.txnlog is not None:
-            return self._wait(lambda: self._undo(trans_id))
-        else:
-            return self._undo(trans_id)
-
     def vote(self, tid):
         self._check_tid(tid, exc=StorageTransactionError)
+        return self._try_to_vote()
 
-        if self.txnlog is not None:
-            return self._wait(lambda: self._vote())
+    def _try_to_vote(self, delay=None):
+        if self.connection is None:
+            return # We're disconnected
+        self.locked = self.server.lock_storage(self)
+        if self.locked:
+            self.log("(%r) unlock: transactions waiting: %s"
+                     % (self.storage_id, self.server.waiting(self)))
+            try:
+                self._vote()
+            except Exception:
+                if delay is not None:
+                    delay.error()
+                else:
+                    raise
+            else:
+                if delay is not None:
+                    delay.reply(None)
         else:
-            return self._vote()
+            if delay == None:
+                self.log("(%r) queue lock: transactions waiting: %s"
+                         % (self.storage_id, self.server.waiting(self)+1))
+                delay = Delay()
+            self.server.unlock_callback(self, delay)
+            return delay
 
+    def _unlock_callback(self, delay):
+        connection = self.connection
+        if connection is not None:
+            connection.trigger.pull_trigger(self._try_to_vote, delay)
 
-    _thunk = _delay = None
-    def _wait(self, thunk):
-        # Wait for the storage lock to be acquired.
-        assert self._thunk == self._delay == None
-        self._thunk = thunk
-        if self.server.lock_storage(self):
-            assert not self.tpc_transaction()
-            self.log("Transaction acquired storage lock.", BLATHER)
-            self.locked = True
-            return self._restart()
+    def _vote(self):
 
-        self._delay = d = Delay()
-        return d
-
-    def _restart(self):
-        if not self.locked:
-            # Must have been disconnected after locking
-            assert self.connection is None
-            return
-
-        # Restart when the storage lock is available.
         if self.txnlog.stores == 1:
             template = "Preparing to commit transaction: %d object, %d bytes"
         else:
@@ -552,22 +519,8 @@
             self.storage.tpc_begin(self.transaction)
 
         try:
-            loads, loader = self.txnlog.get_loader()
-            for i in range(loads):
-                store = loader.load()
-                store_type = store[0]
-                store_args = store[1:]
-
-                if store_type == 'd':
-                    do_store = self._delete
-                elif store_type == 's':
-                    do_store = self._store
-                elif store_type == 'r':
-                    do_store = self._restore
-                else:
-                    raise ValueError('Invalid store type: %r' % store_type)
-
-                if not do_store(*store_args):
+            for op, args in self.txnlog:
+                if not getattr(self, op)(*args):
                     break
 
             # Blob support
@@ -580,18 +533,16 @@
             self._clear_transaction()
             raise
 
-        thunk = self._thunk
-        delay = self._delay
-        self._thunk = self._delay = None
 
-        resp = thunk()
-        if delay is not None:
-            delay.reply(resp)
+        if not self.store_failed:
+            # Only call tpc_vote of no store call failed, otherwise
+            # the serialnos() call will deliver an exception that will be
+            # handled by the client in its tpc_vote() method.
+            serials = self.storage.tpc_vote(self.transaction)
+            if serials:
+                self.serials.extend(serials)
 
-        self.txnlog.close()
-        self.txnlog = None
-        del self.blob_log
-        return resp
+        self.client.serialnos(self.serials)
 
     # The public methods of the ZEO client API do not do the real work.
     # They defer work until after the storage lock has been acquired.
@@ -651,6 +602,13 @@
     def sendBlob(self, oid, serial):
         self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
 
+    def undo(*a, **k):
+        raise NotImplementedError
+
+    def undoa(self, trans_id, tid):
+        self._check_tid(tid, exc=StorageTransactionError)
+        self.txnlog.undo(trans_id)
+
     def _delete(self, oid, serial):
         err = None
         try:
@@ -737,6 +695,27 @@
 
         return err is None
 
+    def _undo(self, trans_id):
+        err = None
+        try:
+            tid, oids = self.storage.undo(trans_id, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self.store_failed = 1
+            if not isinstance(err, TransactionError):
+                # Unexpected errors are logged and passed to the client
+                self.log("store error: %s, %s" % sys.exc_info()[:2],
+                         logging.ERROR, exc_info=True)
+            err = self._marshal_error(err)
+            # The exception is reported back as newserial for this oid
+            self.serials.append((oid, err))
+        else:
+            self.invalidated.extend(oids)
+            self.serials.extend((oid, ResolvedSerial) for oid in oids)
+
+        return err is None
+
     def _marshal_error(self, error):
         # Try to pickle the exception.  If it can't be pickled,
         # the RPC response would fail, so use something that can be pickled.
@@ -750,24 +729,6 @@
             error = StorageServerError(msg)
         return error
 
-    def _vote(self):
-        assert self.locked
-        if not self.store_failed:
-            # Only call tpc_vote of no store call failed, otherwise
-            # the serialnos() call will deliver an exception that will be
-            # handled by the client in its tpc_vote() method.
-            serials = self.storage.tpc_vote(self.transaction)
-            if serials:
-                self.serials.extend(serials)
-
-        self.client.serialnos(self.serials)
-        return
-
-    def _undo(self, trans_id):
-        tid, oids = self.storage.undo(trans_id, self.transaction)
-        self.invalidated.extend(oids)
-        return tid, oids
-
     # IStorageIteration support
 
     def iterator_start(self, start, stop):
@@ -947,9 +908,11 @@
         log("%s created %s with storages: %s" %
             (self.__class__.__name__, read_only and "RO" or "RW", msg))
 
-        self.lockers = dict((name, []) for name in storages)
-        self.lockers_lock = threading.Lock()
 
+        self._lock = threading.Lock()
+        self._commit_locks = {}
+        self._unlock_callbacks = dict((name, []) for name in storages)
+
         self.read_only = read_only
         self.auth_protocol = auth_protocol
         self.auth_database = auth_database
@@ -1236,50 +1199,45 @@
                 cl.remove(conn.obj)
 
     def lock_storage(self, zeostore):
-        self.lockers_lock.acquire()
-        try:
-            storage_id = zeostore.storage_id
-            lockers = self.lockers[storage_id]
-            lockers.append(zeostore)
-            if len(lockers) == 1:
-                self.timeouts[storage_id].begin(zeostore)
-                self.stats[storage_id].lock_time = time.time()
-                return True
-            else:
-                zeostore.log("(%r) queue lock: transactions waiting: %s"
-                             % (storage_id, len(lockers)-1))
-        finally:
-            self.lockers_lock.release()
+        storage_id = zeostore.storage_id
+        with self._lock:
+            if storage_id in self._commit_locks:
+                return False
+            self._commit_locks[storage_id] = zeostore
+            self.timeouts[storage_id].begin(zeostore)
+            self.stats[storage_id].lock_time = time.time()
+        return True
 
     def unlock_storage(self, zeostore):
-        self.lockers_lock.acquire()
-        try:
-            storage_id = zeostore.storage_id
-            lockers = self.lockers[storage_id]
-            if zeostore in lockers:
-                if lockers[0] == zeostore:
-                    self.timeouts[storage_id].end(zeostore)
-                    self.stats[storage_id].lock_time = None
-                    lockers.pop(0)
-                    while lockers:
-                        zeostore.log("(%r) unlock: transactions waiting: %s"
-                                     % (storage_id, len(lockers)-1))
-                        zeostore = lockers[0]
-                        if zeostore.notifyLocked():
-                            self.timeouts[storage_id].begin(zeostore)
-                            self.stats[storage_id].lock_time = time.time()
-                            break
-                        else:
-                            # The queued client was closed, so dequeue it
-                            lockers.pop(0)
-                else:
-                    lockers.remove(zeostore)
-                    if lockers:
-                        zeostore.log("(%r) dequeue: transactions waiting: %s"
-                                     % (storage_id, len(lockers)-1))
-        finally:
-            self.lockers_lock.release()
+        storage_id = zeostore.storage_id
+        with self._lock:
+            assert self._commit_locks[storage_id] is zeostore
+            del self._commit_locks[storage_id]
+            self.timeouts[storage_id].end(zeostore)
+            self.stats[storage_id].lock_time = None
+            callbacks = self._unlock_callbacks[storage_id][:]
+            del self._unlock_callbacks[storage_id][:]
 
+        if callbacks:
+            zeostore.log("(%r) unlock: transactions waiting: %s"
+                         % (storage_id, len(callbacks)-1))
+
+            for zeostore, delay in callbacks:
+                try:
+                    zeostore._unlock_callback(delay)
+                except (SystemExit, KeyboardInterrupt):
+                    raise
+                except Exception:
+                    logger.exception("Calling unlock callback")
+
+    def unlock_callback(self, zeostore, delay):
+        storage_id = zeostore.storage_id
+        with self._lock:
+            self._unlock_callbacks[storage_id].append((zeostore, delay))
+
+    def waiting(self, zeostore):
+        return len(self._unlock_callbacks[zeostore.storage_id])
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -1343,8 +1301,7 @@
             if howlong <= 0:
                 client.log("Transaction timeout after %s seconds" %
                            self._timeout)
-                client.connection.trigger.pull_trigger(
-                    lambda: client.connection.close())
+                client.connection.trigger.pull_trigger(client.connection.close)
             else:
                 time.sleep(howlong)
 

Modified: ZODB/branches/jim-thready/src/ZEO/tests/CommitLockTests.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/tests/CommitLockTests.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/tests/CommitLockTests.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -181,64 +181,3 @@
 
         self._finish_threads()
         self._cleanup()
-
-class CommitLockUndoTests(CommitLockTests):
-
-    def _get_trans_id(self):
-        self._dostore()
-        L = self._storage.undoInfo()
-        return L[0]['id']
-
-    def _begin_undo(self, trans_id, txn):
-        rpc = self._storage._server.rpc
-        return rpc._deferred_call('undo', trans_id, id(txn))
-
-    def _finish_undo(self, msgid):
-        return self._storage._server.rpc._deferred_wait(msgid)
-
-    def checkCommitLockUndoFinish(self):
-        trans_id = self._get_trans_id()
-        oid, txn = self._start_txn()
-        msgid = self._begin_undo(trans_id, txn)
-
-        self._begin_threads()
-
-        self._finish_undo(msgid)
-        self._storage.tpc_vote(txn)
-        self._storage.tpc_finish(txn)
-        self._storage.load(oid, '')
-
-        self._finish_threads()
-
-        self._dostore()
-        self._cleanup()
-
-    def checkCommitLockUndoAbort(self):
-        trans_id = self._get_trans_id()
-        oid, txn = self._start_txn()
-        msgid = self._begin_undo(trans_id, txn)
-
-        self._begin_threads()
-
-        self._finish_undo(msgid)
-        self._storage.tpc_vote(txn)
-        self._storage.tpc_abort(txn)
-
-        self._finish_threads()
-
-        self._dostore()
-        self._cleanup()
-
-    def checkCommitLockUndoClose(self):
-        trans_id = self._get_trans_id()
-        oid, txn = self._start_txn()
-        msgid = self._begin_undo(trans_id, txn)
-        self._begin_threads()
-
-        self._finish_undo(msgid)
-        self._storage.tpc_vote(txn)
-        self._storage.close()
-
-        self._finish_threads()
-
-        self._cleanup()

Modified: ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/zrpc/connection.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -187,7 +187,9 @@
     def reply(self, obj):
         self.send_reply(self.msgid, obj)
 
-    def error(self, exc_info):
+    def error(self, exc_info=None):
+        if exc_info is None:
+            exc_info = sys.exc_info()
         log("Error raised in delayed method", logging.ERROR, exc_info=True)
         self.return_error(self.msgid, 0, *exc_info[:2])
 
@@ -359,15 +361,17 @@
 
     # Protocol variables:
     # Our preferred protocol.
-    current_protocol = "Z309"
+    current_protocol = "Z310"
 
     # If we're a client, an exhaustive list of the server protocols we
     # can accept.
-    servers_we_can_talk_to = ["Z308", current_protocol]
+    servers_we_can_talk_to = ["Z308", "Z309", current_protocol]
 
     # If we're a server, an exhaustive list of the client protocols we
     # can accept.
-    clients_we_can_talk_to = ["Z200", "Z201", "Z303", "Z308", current_protocol]
+    clients_we_can_talk_to = [
+        "Z200", "Z201", "Z303", "Z308", "Z309",
+        current_protocol]
 
     # This is pretty excruciating.  Details:
     #

Modified: ZODB/branches/jim-thready/src/ZEO/zrpc/trigger.py
===================================================================
--- ZODB/branches/jim-thready/src/ZEO/zrpc/trigger.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZEO/zrpc/trigger.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -12,6 +12,8 @@
 #
 ##############################################################################
 
+from __future__ import with_statement
+
 import asyncore
 import os
 import socket
@@ -95,14 +97,15 @@
     def _close(self):    # see close() above; subclass must supply
         raise NotImplementedError
 
-    def pull_trigger(self, thunk=None):
+    def pull_trigger(self, *thunk):
         if thunk:
-            self.lock.acquire()
-            try:
+            with self.lock:
                 self.thunks.append(thunk)
-            finally:
-                self.lock.release()
-        self._physical_pull()
+        try:
+            self._physical_pull()
+        except Exception:
+            if not self._closed:
+                raise
 
     # Subclass must supply _physical_pull, which does whatever the OS
     # needs to do to provoke the "write" end of the trigger.
@@ -114,19 +117,20 @@
             self.recv(8192)
         except socket.error:
             return
-        self.lock.acquire()
-        try:
-            for thunk in self.thunks:
-                try:
-                    thunk()
-                except:
-                    nil, t, v, tbinfo = asyncore.compact_traceback()
-                    print ('exception in trigger thunk:'
-                           ' (%s:%s %s)' % (t, v, tbinfo))
-            self.thunks = []
-        finally:
-            self.lock.release()
 
+        while 1:
+            with self.lock:
+                if self.thunks:
+                    thunk = self.thunks.pop(0)
+                else:
+                    return
+            try:
+                thunk[0](*thunk[1:])
+            except:
+                nil, t, v, tbinfo = asyncore.compact_traceback()
+                print ('exception in trigger thunk:'
+                       ' (%s:%s %s)' % (t, v, tbinfo))
+
     def __repr__(self):
         return '<select-trigger (%s) at %x>' % (self.kind, positive_id(self))
 

Modified: ZODB/branches/jim-thready/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/Connection.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZODB/Connection.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -666,32 +666,11 @@
             self._cache.update_object_size_estimation(oid, len(p))
             obj._p_estimated_size = len(p)
 
-            self._handle_serial(s, oid)
+            self._handle_serial(oid, s)
 
-    def _handle_serial(self, store_return, oid=None, change=1):
-        """Handle the returns from store() and tpc_vote() calls."""
-
-        # These calls can return different types depending on whether
-        # ZEO is used.  ZEO uses asynchronous returns that may be
-        # returned in batches by the ClientStorage.  ZEO1 can also
-        # return an exception object and expect that the Connection
-        # will raise the exception.
-
-        # When conflict resolution occurs, the object state held by
-        # the connection does not match what is written to the
-        # database.  Invalidate the object here to guarantee that
-        # the new state is read the next time the object is used.
-
-        if not store_return:
+    def _handle_serial(self, oid, serial, change=True):
+        if not serial:
             return
-        if isinstance(store_return, str):
-            assert oid is not None
-            self._handle_one_serial(oid, store_return, change)
-        else:
-            for oid, serial in store_return:
-                self._handle_one_serial(oid, serial, change)
-
-    def _handle_one_serial(self, oid, serial, change):
         if not isinstance(serial, str):
             raise serial
         obj = self._cache.get(oid, None)
@@ -757,7 +736,9 @@
         except AttributeError:
             return
         s = vote(transaction)
-        self._handle_serial(s)
+        if s:
+            for oid, serial in s:
+                self._handle_serial(oid, serial)
 
     def tpc_finish(self, transaction):
         """Indicate confirmation that the transaction is done."""
@@ -1171,7 +1152,7 @@
                 s = self._storage.store(oid, serial, data,
                                         '', transaction)
 
-            self._handle_serial(s, oid, change=False)
+            self._handle_serial(oid, s, change=False)
         src.close()
 
     def _abort_savepoint(self):

Modified: ZODB/branches/jim-thready/src/ZODB/tests/RevisionStorage.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/tests/RevisionStorage.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZODB/tests/RevisionStorage.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -122,7 +122,7 @@
             tid = info[0]["id"]
             # Always undo the most recent txn, so the value will
             # alternate between 3 and 4.
-            self._undo(tid, [oid], note="undo %d" % i)
+            self._undo(tid, note="undo %d" % i)
             revs.append(self._storage.load(oid, ""))
 
         prev_tid = None

Modified: ZODB/branches/jim-thready/src/ZODB/tests/StorageTestBase.py
===================================================================
--- ZODB/branches/jim-thready/src/ZODB/tests/StorageTestBase.py	2010-01-15 17:39:53 UTC (rev 108158)
+++ ZODB/branches/jim-thready/src/ZODB/tests/StorageTestBase.py	2010-01-15 19:39:10 UTC (rev 108159)
@@ -209,10 +209,11 @@
         t = transaction.Transaction()
         t.note(note or "undo")
         self._storage.tpc_begin(t)
-        tid, oids = self._storage.undo(tid, t)
+        undo_result = self._storage.undo(tid, t)
         self._storage.tpc_vote(t)
         self._storage.tpc_finish(t)
         if expected_oids is not None:
+            oids = undo_result[1]
             self.assertEqual(len(oids), len(expected_oids), repr(oids))
             for oid in expected_oids:
                 self.assert_(oid in oids)



More information about the Zodb-checkins mailing list