[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.6 db.py:1.5 interfaces.py:1.6

Jeremy Hylton jeremy@zope.com
Tue, 28 Jan 2003 13:17:32 -0500


Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv3644/zodb

Modified Files:
	connection.py db.py interfaces.py 
Log Message:
Refactor Connection.

Delete unused methods, although some of these may need to come back if
Zope3 grows a service that wants to use them.  Removed: oldstate(),
onCloseCallbacks(), applyCloseCallbacks(), cacheFullSweep(),
cacheMinimze().

Organize other methods into groups according to the interface they
implement.  Rename methods that don't implement some interface so that
they start with an underscore.

Add sync() to IAppConnection.

Add __implements__ clauses about transaction.interfaces.


=== Zope3/src/zodb/connection.py 1.5 => 1.6 ===
--- Zope3/src/zodb/connection.py:1.5	Tue Jan 28 11:43:54 2003
+++ Zope3/src/zodb/connection.py	Tue Jan 28 13:17:30 2003
@@ -60,6 +60,7 @@
 from zodb.utils import p64, u64, Set, z64
 
 from transaction import get_transaction
+import transaction.interfaces
 from persistence.cache import Cache
 from persistence.interfaces import IPersistentDataManager
 
@@ -74,7 +75,8 @@
     storage.
     """
 
-    __implements__ = IAppConnection, IConnection, IPersistentDataManager
+    __implements__ = (IAppConnection, IConnection, IPersistentDataManager,
+                      transaction.interfaces.IDataManager)
 
     def __init__(self, db, version='', cache_size=400):
         self._db = db
@@ -104,9 +106,20 @@
         # new_oid is used by serialize
         self.new_oid = self._storage.new_oid
 
+    ######################################################################
+    # IAppConnection defines the next two methods
+    # root() and sync()
+    
     def root(self):
         return self.get(z64)
 
+    def sync(self):
+        get_transaction().abort()
+        sync = getattr(self._storage, 'sync', None)
+        if sync is not None:
+            sync()
+        self._flush_invalidations()
+
     def modifiedInVersion(self, oid):
         try:
             return self._storage.modifiedInVersion(oid)
@@ -242,39 +255,85 @@
 
     def close(self):
         self._cache.incrgc()
-        self.applyCloseCallbacks()
         self._opened = None
         # Return the connection to the pool.
         self._db._closeConnection(self)
 
-    # XXX not sure what the callbacks are for, but they're used by Mount
+    ######################################################################
+    # transaction.interfaces.IDataManager requires the next four methods
+    # prepare(), abort(), commit(), savepoint()
 
-    __onCloseCallbacks = None
+    def prepare(self, txn):
+        self._modified.clear()
+        self._created.clear()
+        if self._tmp is not None:
+            # commit_sub() will call tpc_begin() on the real storage
+            self._commit_sub(txn)
+        else:
+            self._storage.tpc_begin(txn)
 
-    def onCloseCallback(self, f):
-        if self.__onCloseCallbacks is None:
-            self.__onCloseCallbacks = []
-        self.__onCloseCallbacks.append(f)
-
-    def applyCloseCallbacks(self):
-        # Call the close callbacks.
-        if self.__onCloseCallbacks is not None:
-            for f in self.__onCloseCallbacks:
-                try:
-                    f()
-                except:
-                    f_self = getattr(f, 'im_self', f)
-                    self._logger.exception("Close callback failed for %s",
-                                           f_self)
-            self.__onCloseCallbacks = None
+        for obj in self._txns.get(txn, ()):
+            self._objcommit(obj, txn)
 
-    # some cache-related methods
+        s = self._storage.tpc_vote(txn)
+        self._handle_serial(s)
+        return True
 
-    def cacheFullSweep(self, dt=0):
-        self._cache.full_sweep(dt)
+    def abort(self, txn):
+        # XXX need test to make sure it is safe to call abort()
+        # without calling prepare()
+        if self._tmp is not None:
+            self._abort_sub()
+        self._storage.tpc_abort(txn)
 
-    def cacheMinimize(self, dt=0):
-        self._cache.minimize(dt)
+        objs = self._txns.get(txn)
+        if objs is not None:
+            self._cache.invalidateMany([obj._p_oid for obj in objs])
+            del self._txns[txn]
+        self._flush_invalidations()
+        self._cache.invalidateMany(self._modified)
+        self._invalidate_created(self._created)
+        self._created = Set()
+        self._modified.clear()
+
+    def commit(self, txn):
+        # It's important that the storage call the function we pass
+        # (self._invalidate_modified) while it still has its
+        # lock.  We don't want another thread to be able to read any
+        # updated data until we've had a chance to send an
+        # invalidation message to all of the other connections!
+
+        self._db.begin_invalidation()
+        # XXX We should really have a try/finally because the begin
+        # call acquired a lock that will only be released in
+        # _invalidate_modified().
+        self._storage.tpc_finish(txn, self._invalidate_modified)
+        try:
+            del self._txns[txn]
+        except KeyError:
+            pass
+
+        self._flush_invalidations()
+
+    def savepoint(self, txn):
+        if self._tmp is None:
+            tmp = TmpStore(self._version)
+            self._tmp = self._storage
+            self._storage = tmp
+            tmp.registerDB(self._db)
+        self._modified = Set()
+        self._created = Set()
+        self._storage.tpc_begin(txn)
+
+        for obj in self._txns.get(txn, ()):
+            self._objcommit(obj, txn)
+        self.importHook(txn) # hook for ExportImport
+
+        # The tpc_finish() of TmpStore returns an UndoInfo object.
+        undo = self._storage.tpc_finish(txn)
+        self._storage._created = self._created
+        self._created = Set()
+        return Rollback(self, undo)
 
     def _flush_invalidations(self):
         self._inv_lock.acquire()
@@ -286,7 +345,49 @@
         # Now is a good time to collect some garbage
         self._cache.incrgc()
 
-    def objcommit(self, object, transaction):
+    def _handle_serial(self, store_return, oid=None, change=True):
+        """Handle the returns from store() and tpc_vote() calls."""
+
+        # These calls can return different types depending on whether
+        # ZEO is used.  ZEO uses asynchronous returns that may be
+        # returned in batches by the ClientStorage.  ZEO1 can also
+        # return an exception object and expect that the Connection
+        # will raise the exception.
+
+        # When _commit_sub() exceutes a store, there is no need to
+        # update the _p_changed flag, because the subtransaction
+        # tpc_vote() calls already did this.  The change=1 argument
+        # exists to allow _commit_sub() to avoid setting the flag
+        # again.
+        if not store_return:
+            return
+        if isinstance(store_return, StringType):
+            assert oid is not None
+            serial = store_return
+            obj = self._cache.get(oid, None)
+            if obj is None:
+                return
+            if serial == ResolvedSerial:
+                obj._p_changed = None
+            else:
+                if change:
+                    obj._p_changed = 0
+                obj._p_serial = serial
+        else:
+            for oid, serial in store_return:
+                if not isinstance(serial, StringType):
+                    raise serial
+                obj = self._cache.get(oid, None)
+                if obj is None:
+                    continue
+                if serial == ResolvedSerial:
+                    obj._p_changed = None
+                else:
+                    if change:
+                        obj._p_changed = 0
+                    obj._p_serial = serial
+
+    def _objcommit(self, object, transaction):
         oid = object._p_oid
         self._logger.debug("commit object %s", u64(oid))
 
@@ -305,12 +406,9 @@
 
         writer = ObjectWriter(self)
         for obj in writer.newObjects(object):
-            self.commit_store(writer, obj, transaction)
+            self._commit_store(writer, obj, transaction)
 
-    def commit_store(self, writer, pobject, transaction):
-        # XXX the file and pickler get reset each time through...
-        # Maybe just create new ones each time?  Except I'm not sure
-        # how that interacts with the persistent_id attribute.
+    def _commit_store(self, writer, pobject, transaction):
         oid = pobject._p_oid
         serial = getattr(pobject, '_p_serial', None)
         if serial is None:
@@ -330,8 +428,8 @@
         self._cache[oid] = pobject
         self._handle_serial(s, oid)
 
-    def commit_sub(self, txn):
-        """Commit all work done in subtransactions"""
+    def _commit_sub(self, txn):
+        # Commit all work done in subtransactions.
         assert self._tmp is not None
 
         tmp = self._storage
@@ -347,10 +445,10 @@
         for oid in tmp._index:
             data, serial = tmp.load(oid, tmp._bver)
             s = self._storage.store(oid, serial, data, self._version, txn)
-            self._handle_serial(s, oid, change=0)
+            self._handle_serial(s, oid, change=False)
 
-    def abort_sub(self):
-        """Abort work done in subtransactions"""
+    def _abort_sub(self):
+        # Abort work done in subtransactions.
         assert self._tmp is not None
 
         tmp = self._storage
@@ -361,7 +459,7 @@
         self._invalidate_created(tmp._created)
 
     def _invalidate_created(self, created):
-        """Dis-own new objects from uncommitted transaction."""
+        # Dis-own new objects from uncommitted transaction.
         for oid in created:
             o = self._cache.get(oid)
             if o is not None:
@@ -369,144 +467,15 @@
                 del o._p_oid
                 del self._cache[oid]
 
-    ######################################################################
-    # Transaction.IDataManager
-
-    def oldstate(self, object, serial):
-        """Return the state of an object as of serial.
-
-        This routine is used by Zope's History facility.
-        """
-        p = self._storage.loadSerial(object._p_oid, serial)
-        return self._reader.getState(p)
-
-    def savepoint(self, txn):
-        if self._tmp is None:
-            tmp = TmpStore(self._version)
-            self._tmp = self._storage
-            self._storage = tmp
-            tmp.registerDB(self._db)
-        self._modified = Set()
-        self._created = Set()
-        self._storage.tpc_begin(txn)
-
-        for obj in self._txns.get(txn, ()):
-            self.objcommit(obj, txn)
-        self.importHook(txn) # hook for ExportImport
-
-        # The tpc_finish() of TmpStore returns an UndoInfo object.
-        undo = self._storage.tpc_finish(txn)
-        self._storage._created = self._created
-        self._created = Set()
-        return Rollback(self, undo)
-
-    def abort(self, txn):
-        # XXX need test to make sure it is safe to call abort()
-        # without calling prepare()
-        if self._tmp is not None:
-            self.abort_sub()
-        self._storage.tpc_abort(txn)
-
-        objs = self._txns.get(txn)
-        if objs is not None:
-            self._cache.invalidateMany([obj._p_oid for obj in objs])
-            del self._txns[txn]
-        self._flush_invalidations()
-        self._cache.invalidateMany(self._modified)
-        self._invalidate_created(self._created)
-        self._created = Set()
-        self._modified.clear()
-
-    def prepare(self, txn):
-        self._modified.clear()
-        self._created.clear()
-        if self._tmp is not None:
-            # commit_sub() will call tpc_begin() on the real storage
-            self.commit_sub(txn)
-        else:
-            self._storage.tpc_begin(txn)
-
-        for obj in self._txns.get(txn, ()):
-            self.objcommit(obj, txn)
-
-        s = self._storage.tpc_vote(txn)
-        self._handle_serial(s)
-        return True
-
-    def _handle_serial(self, store_return, oid=None, change=1):
-        """Handle the returns from store() and tpc_vote() calls."""
-
-        # These calls can return different types depending on whether
-        # ZEO is used.  ZEO uses asynchronous returns that may be
-        # returned in batches by the ClientStorage.  ZEO1 can also
-        # return an exception object and expect that the Connection
-        # will raise the exception.
-
-        # When commit_sub() exceutes a store, there is no need to
-        # update the _p_changed flag, because the subtransaction
-        # tpc_vote() calls already did this.  The change=1 argument
-        # exists to allow commit_sub() to avoid setting the flag
-        # again.
-        if not store_return:
-            return
-        if isinstance(store_return, StringType):
-            assert oid is not None
-            serial = store_return
-            obj = self._cache.get(oid, None)
-            if obj is None:
-                return
-            if serial == ResolvedSerial:
-                obj._p_changed = None
-            else:
-                if change:
-                    obj._p_changed = 0
-                obj._p_serial = serial
-        else:
-            for oid, serial in store_return:
-                if not isinstance(serial, StringType):
-                    raise serial
-                obj = self._cache.get(oid, None)
-                if obj is None:
-                    continue
-                if serial == ResolvedSerial:
-                    obj._p_changed = None
-                else:
-                    if change:
-                        obj._p_changed = 0
-                    obj._p_serial = serial
-
-    def commit(self, txn):
-        # It's important that the storage call the function we pass
-        # (self._invalidate_modified) while it still has its
-        # lock.  We don't want another thread to be able to read any
-        # updated data until we've had a chance to send an
-        # invalidation message to all of the other connections!
-
-        self._db.begin_invalidation()
-        # XXX We should really have a try/finally because the begin
-        # call acquired a lock that will only be released in
-        # _invalidate_modified().
-        self._storage.tpc_finish(txn, self._invalidate_modified)
-        try:
-            del self._txns[txn]
-        except KeyError:
-            pass
-
-        self._flush_invalidations()
-
     def _invalidate_modified(self):
+        # Called from the storage's tpc_finish() method after
+        # self._db.begin_invalidation() is called.  The begin_
+        # and finish_invalidation() methods acquire and release
+        # a lock.
         for oid in self._modified:
             self._db.invalidate(oid, self)
         self._db.finish_invalidation()
 
-    def sync(self):
-        # XXX Is it safe to abort things right now?
-        get_transaction().abort()
-        sync = getattr(self._storage, 'sync', None)
-        if sync is not None:
-            sync()
-        self._flush_invalidations()
-
 class Rollback:
     """Rollback changes associated with savepoint"""
 
@@ -517,6 +486,8 @@
     # XXX Should it be possible to rollback() to the same savepoint
     # more than once?
 
+    __implements__ = transaction.interfaces.IRollback
+
     def __init__(self, conn, tmp_undo):
         self._conn = conn
         self._tmp_undo = tmp_undo # undo info from the storage
@@ -642,6 +613,11 @@
         self._tindex.clear()
 
 class UndoInfo:
+    """A helper class for rollback.
+
+    The class stores the state necessary for rolling back to a
+    particular time.
+    """
 
     def __init__(self, store, pos, index):
         self._store = store


=== Zope3/src/zodb/db.py 1.4 => 1.5 ===
--- Zope3/src/zodb/db.py:1.4	Fri Jan 24 18:20:58 2003
+++ Zope3/src/zodb/db.py	Tue Jan 28 13:17:30 2003
@@ -130,15 +130,6 @@
     def abortVersion(self, version):
         AbortVersion(self, version)
 
-    # XXX I don't think the cache should be used via _cache.
-    # Not sure that both full sweep and minimize need to stay.
-
-    def cacheFullSweep(self):
-        self._connectionMap(lambda c: c._cache.full_sweep())
-
-    def cacheMinimize(self):
-        self._connectionMap(lambda c: c._cache.minimize())
-
     def close(self):
         self._storage.close()
 


=== Zope3/src/zodb/interfaces.py 1.5 => 1.6 ===
--- Zope3/src/zodb/interfaces.py:1.5	Tue Jan 28 11:42:25 2003
+++ Zope3/src/zodb/interfaces.py	Tue Jan 28 13:17:30 2003
@@ -237,6 +237,12 @@
     def root():
         """Return the root of the database."""
 
+    def sync():
+        """Process pending invalidations.
+
+        If there is a current transaction, it will be aborted.
+        """
+
 class IConnection(Interface):
     """Interface required of Connection by ZODB DB.