[Zodb-checkins] CVS: StandaloneZODB/bsddb3Storage/bsddb3Storage - Full.py:1.34

Barry Warsaw barry@wooz.org
Thu, 1 Nov 2001 18:21:08 -0500


Update of /cvs-repository/StandaloneZODB/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv30678/bsddb3Storage/bsddb3Storage

Modified Files:
	Full.py 
Log Message:
Several improvements to pack(), which should be good enough for the
1.0 release.  These changes pass all the tests, but the proof will be
packing some real data (to come next -- this stuff has to be checked
in first).  Specifically,

__init__(): We now allocate a pack-lock which is acquired in pack() to
prevent multiple threads from packing at the same time.  This should
be deadlock-proof because you should never have the storage lock when
you want to acquire the pack lock (but I could be missing something
obvious). 

_rootreachable(): New method which calculates the set of object ids
reachable from the current revision of the root object.

_zapobject(): Rewritten to remove the recursion.  It populates a
dictionary (passed in the parameter) with the oids for any objects
whose refcounts went to zero because of the object we're now zapping.

_zaprevision(): Same deal as _zapobjects().

_dopack(): The guts of the pack() operation, done this way so pack()
itself can just be a wrapper around _dopack() with pack lock
acquisition/release.  This also does not acquire the storage lock
until it actually has to start zapping revisions, and it releases and
reacquires the lock for each revision it zaps.  This gives other
threads an opportunity to do work during a pack, which should never
impact the pack as any work they do will happen in the future (and
we're safe against bogus future pack times).

pack(): Acquire the pack lock, _dopack(), release the pack lock.


=== StandaloneZODB/bsddb3Storage/bsddb3Storage/Full.py 1.33 => 1.34 ===
 from ZODB.TimeStamp import TimeStamp
 from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
+import ThreadLock
 
 # BerkeleyBase.BerkeleyBase class provides some common functionality for both
 # the Full and Minimal implementations.  It in turn inherits from
@@ -46,6 +47,10 @@
     #
     # Overrides of base class methods
     #
+    def __init__(self, name, env=None, prefix='zodb_'):
+        self._packlock = ThreadLock.allocate_lock()
+        BerkeleyBase.__init__(self, name, env, prefix)
+
     def _setupDBs(self):
         # Data Type Assumptions:
         #
@@ -928,25 +933,52 @@
         finally:
             self._lock_release()
 
-    def _zapobject(self, oid, referencesf):
+    def _rootreachable(self, referencesf):
+        # Quick exit for empty storages
+        if not self._serials:
+            return {}
+        reachables = {}
+        seen = {}
+        lookingat = {ZERO: 1}
+        # Start at the root, find all the objects the current revision of the
+        # root references, and then for each of those, find all the objects it
+        # references, and so on until we've traversed the entire object graph.
+        while lookingat:
+            oid, ignore = lookingat.popitem()
+            # Don't look at objects we've already seen
+            if seen.has_key(oid):
+                continue
+            seen[oid] = 1
+            reachables[oid] = 1
+            # Get the pickle data for the object's current version
+            revid = self._serials.get(oid)
+            lrevid = self._metadata[oid+revid][16:24]
+            pickle = self._pickles[oid+lrevid]
+            refdoids = []
+            referencesf(pickle, refdoids)
+            # BAW: recursion might be bad, but it's easy to implement. :(
+            for oid in refdoids:
+                lookingat[oid] = 1
+        return reachables
+
+    def _zapobject(self, oid, decrefoids, referencesf):
         # Delete all records referenced by this object
-        try:
-            self._serials.delete(oid)
-        except db.DBNotFoundError:
-            pass
+        self._serials.delete(oid)
+        # This oid should not be in the decrefoids set because it would have
+        # had to have been popitem()'d off to even get here.  Let's just make
+        # sure of that invariant...
+        assert not decrefoids.has_key(oid)
+        # We don't need to track reference counts to this object anymore
         self._refcounts.delete(oid)
         # Run through all the metadata records associated with this object,
-        # and recursively zap all its revisions.  Keep track of the tids and
-        # vids referenced by the metadata record, so that we can clean up the
-        # txnoids and currentVersions tables.
+        # and iterate through all its revisions, zapping them.  Keep track of
+        # the tids and vids referenced by the metadata record, so we can clean
+        # up the txnoids and currentVersions tables too.
         tids = {}
         vids = {}
         c = self._metadata.cursor()
         try:
-            try:
-                rec = c.set_range(oid)
-            except db.DBNotFoundError:
-                return
+            rec = c.set_range(oid)
             while rec and rec[0][:8] == oid:
                 key, data = rec
                 rec = c.next()
@@ -955,7 +987,7 @@
                 vid = data[:8]
                 if vid <> ZERO:
                     vids[vid] = 1
-                self._zaprevision(key, referencesf)
+                self._zaprevision(key, decrefoids, referencesf)
         finally:
             c.close()
         # Zap all the txnoid records...
@@ -985,140 +1017,164 @@
         finally:
             c.close()
         # ... now for each vid, delete the versions->vid and vid->versions
-        # mapping if we've deleted all references to this vid
+        # mapping if we've deleted all references to this vid.
         for vid in vids.keys():
             if self._currentVersions.get(vid):
                 continue
-            version = self._versions[vid]
+            version = self._versions.get(vid)
             self._versions.delete(vid)
             self._vids.delete(version)
 
-    def _zaprevision(self, key, referencesf):
-        # Delete the metadata record pointed to by the key, decrefing the
-        # reference counts of the pickle pointed to by this record, and
-        # perform cascading decrefs on the referenced objects.
-        #
-        # We need the lrevid which points to the pickle for this revision.
-        try:
-            lrevid = self._metadata[key][16:24]
-        except KeyError:
-            return
+    def _zaprevision(self, key, decrefoids, referencesf):
+        # For each revision we're going to delete, we need to decref the
+        # pickle this revision points at.  If that pickle gets decref'd to
+        # zero, then we know we can reclaim all of those records too.  But
+        # zapping a pickle means we then have to decref all the objects that
+        # that pickle references, and so on...
+        lrevid = self._metadata.get(key)[16:24]
         # Decref the reference count of the pickle pointed to by oid+lrevid.
         # If the reference count goes to zero, we can garbage collect the
         # pickle, and decref all the objects pointed to by the pickle (with of
         # course, cascading garbage collection).
         pkey = key[:8] + lrevid
-        refcount = U64(self._pickleRefcounts[pkey]) - 1
+        refcount = U64(self._pickleRefcounts.get(pkey)) - 1
         if refcount > 0:
+            # Not decref'd to zero, so just store the refcount back in
             self._pickleRefcounts.put(pkey, p64(refcount))
         else:
-            # The refcount of this pickle has gone to zero, so we need to
-            # garbage collect it, and decref all the objects it points to.
-            pickle = self._pickles[pkey]
+            # Decref'd to zero, so decref all the objects that this pickle
+            # refers to, and delete the pickle entries.  If the objects
+            # themselves get decref'd to zero, add them to the decrefoids set
+            # for later collection
+            pickle = self._pickles.get(pkey)
             # Sniff the pickle to get the objects it refers to
             refoids = []
             referencesf(pickle, refoids)
-            # Now decref the reference counts for each of those objects.  If
-            # the object's refcount goes to zero, remember the oid so we can
-            # recursively zap its metadata records too.
-            collectables = {}
             for oid in refoids:
-                refcount = U64(self._refcounts[oid]) - 1
+                refcount = U64(self._refcounts.get(oid)) - 1
                 if refcount > 0:
                     self._refcounts.put(oid, p64(refcount))
                 else:
-                    collectables[oid] = 1
-            # Garbage collect all objects with refcounts that just went to
-            # zero.
-            for oid in collectables.keys():
-                self._zapobject(oid, referencesf)
-            # We can now delete both the pickleRefcounts and pickle entry for
-            # this garbage collected pickle.
+                    decrefoids[oid] = 1
+            # We're done with the pickleRefcounts and pickle entries for this
+            # garbage collected pickle data.
             self._pickles.delete(pkey)
             self._pickleRefcounts.delete(pkey)
-        # We can now delete this metadata record.
+        # Now we're done with this metadata record
         self._metadata.delete(key)
+        # We'll erase the knowledge that this tid touched this object
+        tid = key[8:]
+        c = self._txnoids.cursor()
+        try:
+            # Yes, the key here is the tid, which is the second 8 bytes, while
+            # the value is the oid, which is the first 8 bytes.
+            c.set_both(tid, key[:8])
+            c.delete()
+        finally:
+            c.close()
+        # Now, we've either done one of two things: 1) we've removed the last
+        # record of an oid being modified in this transaction, in which case
+        # we can garbage collec the txnMetadata because there aren't any
+        # object revisions that reference this transaction.  OR, 2) there are
+        # still uncollected objects in this transaction, but we must mark this
+        # transaction as packed() so that undo can't possibly introduce a
+        # temporal anomoly.
+        if self._txnoids.has_key(tid):
+            meta = self._txnMetadata.get(tid)[1:]
+            self._txnMetadata.put(tid, PROTECTED_TRANSACTION + meta)
+        else:
+            self._txnMetadata.delete(tid)
 
-    def pack(self, t, referencesf):
-        # BAW: This doesn't play nicely if you enable the `debugging revids'
-        #
+    def _dopack(self, t, referencesf):
         # t is a TimeTime, or time float, convert this to a TimeStamp object,
-        # using an algorithm similar to what's used in FileStorage.  The
-        # TimeStamp can then be used as a key in the txnMetadata table, since
-        # we know our revision id's are actually TimeStamps.
-        t0 = TimeStamp(*(time.gmtime(t)[:5] + (t%60,)))
-        self._lock_acquire()
-        c = None
-        tidmarks = {}
-        oids = {}
-        try:    
-            # Figure out when to pack to.  We happen to know that our
-            # transaction ids are really timestamps.
-            c = self._txnoids.cursor()
-            # Need to use the repr of the TimeStamp so we get a string
+        # using an algorithm similar to what's used in FileStorage.  We know
+        # that our transaction ids, a.k.a. revision ids, are timestamps.  BAW:
+        # This doesn't play nicely if you enable the `debugging revids'
+        #
+        # BAW: should a pack time in the future be a ValueError?  We'd have to
+        # worry about clock skew, so for now, we just set the pack time to the
+        # minimum of t and now.
+        packtime = min(t, time.time())
+        t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
+        packtid = `t0`
+        # Calculate the set of objects reachable from the root.  Anything else
+        # is a candidate for having all their revisions packed away.
+        reachables = self._rootreachable(referencesf)
+        # We now cruise through all the objects we know about, i.e. the keys
+        # of the serials table, looking at all the object revisions earlier
+        # than the pack time.  If the revision is not the current revision,
+        # then it's a packable revision.  We employ a BDB trick of set_range()
+        # to give us the smallest record greater than or equal to the one we
+        # ask for.  We move to the one just before that, and cruise backwards.
+        #
+        # This should also make us immune to evil future-pack time values,
+        # although it would still be better to raise a ValueError in those
+        # situations.  This is a dictionary keyed off the object id, with
+        # values which are a list of revisions (oid+tid) that can be packed.
+        packablerevs = {}
+        c = self._metadata.cursor()
+        # BAW: can two threads be packing at the same time?  If so, we need to
+        # handle that.  If not, we should enforce that with a pack-lock.
+        for oid in self._serials.keys():
             try:
-                rec = c.set_range(`t0`)
+                rec = c.set_range(oid+packtid)
+                # The one just before this should be the largest record less
+                # than or equal to the key, i.e. the object revision just
+                # before the given pack time.
+                rec = c.prev()
             except db.DBNotFoundError:
+                # Perhaps the last record in the database is the last one
+                # containing this oid?
                 rec = c.last()
+            # Now move backwards in time to look at all the revisions of this
+            # object.  All but the current one are packable, unless the object
+            # isn't reachable from the root, in which case, all its revisions
+            # are packable.
             while rec:
-                tid, oid = rec
+                key, data = rec
                 rec = c.prev()
-                # We need to mark this transaction as having participated in a
-                # pack, so that undo will not create a temporal anomaly.
-                if not tidmarks.has_key(tid):
-                    meta = self._txnMetadata[tid]
-                    # Has this transaction already been packed?  If so, we can
-                    # stop here... I think!
-                    if meta[0] == PROTECTED_TRANSACTION:
-                        break
-                    self._txnMetadata[tid] = PROTECTED_TRANSACTION + meta[1:]
-                    tidmarks[tid] = 1
-                # For now, just remember which objects are touched by the
-                # packable
-                if oid <> ZERO:
-                    oids[oid] = 1
-            # Now look at every object revision metadata record for the
-            # objects that have been touched in the packable transactions.  If
-            # the metadata record points at the current revision of the
-            # object, ignore it, otherwise reclaim it.
-            c.close()
-            c = self._metadata.cursor()
-            for oid in oids.keys():
-                try:
-                    current = self._serials[oid]
-                except KeyError:
-                    continue
-                try:
-                    rec = c.set_range(oid)
-                except db.DBNotFoundError:
-                    continue
-                while rec:
-                    key, data = rec
-                    rec = c.next()
-                    if key[:8] <> oid:
-                        break
-                    if key[8:] == current:
-                        continue
-                    self._zaprevision(key, referencesf)
-                # Now look and see if the object has a reference count of
-                # zero, and if so garbage collect it.  refcounts will be None
-                # if the reference count of this object is zero, i.e. it won't
-                # be in the table.
-                refcounts = self._refcounts.get(oid)
-                if not refcounts:
-                    # The current revision should be the only revision of this
-                    # object that exists, otherwise its refcounts shouldn't be
-                    # zero.
-                    self._zaprevision(oid+current, referencesf)
-                    # And delete a few other records that _zaprevisions()
-                    # doesn't clean up
-                    self._serials.delete(oid)
-                    if refcounts is not None:
-                        self._refcounts.delete(oid)
+                # Make sure we're still looking at revisions for this object
+                if oid <> key[:8]:
+                    break
+                if not reachables.has_key(oid):
+                    packablerevs.setdefault(oid, []).append(key)
+                # Otherwise, if this isn't the current revision for this
+                # object, then it's packable.
+                elif self._serials[oid] <> key[8:]:
+                    packablerevs.setdefault(oid, []).append(key)
+        # We now have all the packable revisions we're going to handle.  For
+        # each object with revisions that we're going to pack away, acquire
+        # the storage lock so we can do that without fear of trampling by
+        # other threads (i.e. interaction of transactionalUndo() and pack()).
+        #
+        # This set contains the oids of all objects that have been decref'd
+        # to zero by the pack operation.  To avoid recursion, we'll just note
+        # them now and handle them in a loop later.
+        #
+        # BAW: should packs be transaction protected?
+        decrefoids = {}
+        for oid in packablerevs.keys():
+            self._lock_acquire()
+            try:
+                for key in packablerevs[oid]:
+                    self._zaprevision(key, decrefoids, referencesf)
+            finally:
+                self._lock_release()
+        # While there are still objects to collect, continue to do so.
+        # Note that collecting an object may reveal more objects that are
+        # dec refcounted to zero.
+        while decrefoids:
+            oid, ignore = decrefoids.popitem()
+            self._zapobject(oid, decrefoids, referencesf)
+
+    def pack(self, t, referencesf):
+        # A simple wrapper around the bulk of packing, but which acquires a
+        # lock that prevents multiple packs from running at the same time.
+        self._packlock.acquire()
+        try:
+            self._dopack(t, referencesf)
         finally:
-            if c:
-                c.close()
-            self._lock_release()
+            self._packlock.release()
 
     # GCable interface, for cyclic garbage collection
     #