[Zope-Checkins] SVN: Zope/trunk/lib/python/Products/Transience/ Merge transience changes from chrism-pre27-branch.

Chris McDonough chrism at plope.com
Fri Sep 17 23:27:58 EDT 2004


Log message for revision 27630:
  Merge transience changes from chrism-pre27-branch.
  


Changed:
  U   Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx
  A   Zope/trunk/lib/python/Products/Transience/TransactionHelper.py
  U   Zope/trunk/lib/python/Products/Transience/Transience.py
  U   Zope/trunk/lib/python/Products/Transience/TransientObject.py
  U   Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml
  A   Zope/trunk/lib/python/Products/Transience/tests/testCounters.py
  A   Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py
  U   Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py


-=-
Modified: Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx
===================================================================
--- Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/HowTransienceWorks.stx	2004-09-18 03:27:57 UTC (rev 27630)
@@ -42,6 +42,7 @@
     inside of the "_data" structure.  There is a concept of a
     "current" bucket, which is the bucket that is contained within the
     _data structured with a key equal to the "current" timeslice.
+    A current bucket must always exist (this is an invariant).
 
   - A "max_timeslice" integer, which is equal to the "largest"
     timeslice for which there exists a bucket in the _data structure.
@@ -74,10 +75,13 @@
 Replentishing
 
   The TOC performs "finalization", "garbage collection", and "bucket
-  replentishing".  It performs these tasks "in-band".  This means that
-  the TOC does not maintain a separate thread that wakes up every so
-  often to do these housekeeping tasks.  Instead, during the course of
-  normal operations, the TOC opportunistically performs them.
+  replentishing".  It typically performs these tasks "in-band"
+  (although it is possible to do the housekeeping tasks "out of band"
+  as well: see the methods of the Transient Object Container with
+  "housekeep" in their names).  "In band" housekeeping implies that
+  the TOC does not maintain a separate thread or process that wakes up
+  every so often to clean up.  Instead, during the course of normal
+  operations, the TOC opportunistically performs housekeeping functions.
 
   Finalization is defined as optionally calling a function at bucket
   expiration time against all transient objects contained within that

Added: Zope/trunk/lib/python/Products/Transience/TransactionHelper.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/TransactionHelper.py	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/TransactionHelper.py	2004-09-18 03:27:57 UTC (rev 27630)
@@ -0,0 +1,36 @@
+import time
+
+class PreventTransactionCommit(Exception):
+    def __init__(self, reason):
+        self. reason = reason
+
+    def __str__(self):
+        return "Uncommittable transaction: " % self.reason
+    
+class UncommittableJar:
+    """ A jar that cannot be committed """
+    def __init__(self, reason):
+        self.reason = reason
+        self.time = time.time()
+        
+    def sort_key(self):
+        return self.time()
+
+    def tpc_begin(self, *arg, **kw):
+        pass
+
+    def commit(self, obj, transaction):
+        pass
+
+    def tpc_vote(self, transaction):
+        raise PreventTransactionCommit(self.reason)
+
+class makeTransactionUncommittable:
+    """
+    - register an uncommittable object with the provided transaction
+      which prevents the commit of that transaction
+    """
+    def __init__(self, transaction, reason):
+        self._p_jar = UncommittableJar(reason)
+        transaction.register(self)
+        

Modified: Zope/trunk/lib/python/Products/Transience/Transience.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/Transience.py	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/Transience.py	2004-09-18 03:27:57 UTC (rev 27630)
@@ -30,13 +30,13 @@
      TTWDictionary, ImmutablyValuedMappingOfPickleableObjects,\
      StringKeyedHomogeneousItemContainer, TransientItemContainer
 
-from BTrees.Length import Length
+from BTrees.Length import Length as BTreesLength
 from BTrees.OOBTree import OOBTree
 from BTrees.IOBTree import IOBTree
-from ZODB.POSException import ConflictError
 
 from Persistence import Persistent
 from OFS.SimpleItem import SimpleItem
+from ZPublisher.Publish import Retry
 from AccessControl import ClassSecurityInfo, getSecurityManager
 from AccessControl.SecurityManagement import newSecurityManager, \
      setSecurityManager
@@ -129,6 +129,7 @@
 
     _limit = 0
     _data = None
+    _inband_housekeeping = True
 
     security.setDefaultAccess('deny')
 
@@ -206,7 +207,7 @@
         # We make enough buckets initially to last us a while, and
         # we subsequently extend _data with fresh buckets and remove old
         # buckets as necessary during normal operations (see
-        # _gc() and _replentish()).
+        # _replentish() and _gc()).
         self._data = DATA_CLASS()
 
         # populate _data with some number of buckets, each of which
@@ -232,6 +233,10 @@
         # each expired item.
         self._last_finalized_timeslice = Increaser(-self._period)
 
+        # '_last_gc_timeslice' is a value that indicates in which
+        # timeslice the garbage collection process was last run.
+        self._last_gc_timeslice = Increaser(-self._period)
+        
         # our "_length" is the number of "active" data objects in _data.
         # it does not include items that are still kept in _data but need to
         # be garbage collected.
@@ -239,8 +244,10 @@
         # we need to maintain the length of the index structure separately
         # because getting the length of a BTree is very expensive, and it
         # doesn't really tell us which ones are "active" anyway.
-        try: self._length.set(0)
-        except AttributeError: self._length = self.getLen = Length()
+        try:
+            self._length.set(0)
+        except AttributeError:
+            self._length = self.getLen = Length2()
 
     def _getCurrentSlices(self, now):
         if self._timeout_slices:
@@ -269,16 +276,15 @@
             bucket = self._data.get(0)
             return bucket.get(k, default)
 
-        # always call finalize
-        self._finalize(current_ts)
+        if self._inband_housekeeping:
+            self._housekeep(current_ts)
 
-        # call gc and/or replentish on an only-as needed basis
-        if self._roll(current_ts, 'replentish'):
-            self._replentish(current_ts)
+        else:
+            # dont allow the TOC to stop working in an emergency bucket
+            # shortage
+            if self._in_emergency_bucket_shortage(current_ts):
+                self._replentish(current_ts)
 
-        if self._roll(current_ts, 'gc'):
-            self._gc(current_ts)
-
         # SUBTLETY ALERTY TO SELF: do not "improve" the code below
         # unnecessarily, as it will end only in tears.  The lack of aliases
         # and the ordering is intentional.
@@ -288,7 +294,8 @@
         found_ts = None
 
         for ts in current_slices:
-            abucket = self._data.get(ts, None)
+            abucket = self._data.get(ts, None) # XXX ReadConflictError hotspot
+
             if abucket is None:
                 DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
                 continue
@@ -348,14 +355,13 @@
         else:
             current_ts = 0
 
-        self._finalize(current_ts)
+        if self._inband_housekeeping:
+            self._housekeep(current_ts)
 
-        if self._roll(current_ts, 'replentish'):
-            self._replentish(current_ts)
+        elif self._in_emergency_bucket_shortage(current_ts):
+            # if our scheduler fails, dont allow the TOC to stop working
+            self._replentish(current_ts, force=True)
 
-        if self._roll(current_ts, 'gc'):
-            self._gc(current_ts)
-
         STRICT and _assert(self._data.has_key(current_ts))
         current = self._getCurrentSlices(current_ts)
 
@@ -374,8 +380,8 @@
     def keys(self):
         return self._all().keys()
 
-    def rawkeys(self, current_ts):
-        # for debugging
+    def raw(self, current_ts):
+        # for debugging and unit testing
         current = self._getCurrentSlices(current_ts)
 
         current.reverse() # overwrite older with newer
@@ -425,15 +431,20 @@
         STRICT and _assert(self._data.has_key(current_ts))
         if item is _marker:
             # the key didnt already exist, this is a new item
-            if self._limit and len(self) >= self._limit:
+
+            length = self._length() # XXX ReadConflictError hotspot
+
+            if self._limit and length >= self._limit:
                 LOG('Transience', WARNING,
                     ('Transient object container %s max subobjects '
                      'reached' % self.getId())
                     )
                 raise MaxTransientObjectsExceeded, (
                  "%s exceeds maximum number of subobjects %s" %
-                 (len(self), self._limit))
-            self._length.change(1)
+                 (length, self._limit))
+
+            self._length.increment(1)
+
         DEBUG and TLOG('__setitem__: placing value for key %s in bucket %s' %
                        (k, current_ts))
         current_bucket = self._data[current_ts]
@@ -460,7 +471,11 @@
         if not issubclass(BUCKET_CLASS, Persistent):
             # tickle persistence machinery
             self._data[current_ts] = bucket
-        self._length.change(-1)
+
+        # XXX does increment(-1) make any sense here?
+        # rationale from dunny: we are removing an item rather than simply
+        # declaring it to be unused?
+        self._length.increment(-1)
         return current_ts, item
 
     def __len__(self):
@@ -496,79 +511,46 @@
         DEBUG and TLOG('has_key: returning false from for %s' % k)
         return False
 
-    def _roll(self, now, reason):
-        """
-        Roll the dice to see if we're the lucky thread that does
-        bucket replentishment or gc.  This method is guaranteed to return
-        true at some point as the difference between high and low naturally
-        diminishes to zero.
+    def _get_max_expired_ts(self, now):
+        return now - (self._period * (self._timeout_slices + 1))
 
-        The reason we do the 'random' dance in the last part of this
-        is to minimize the chance that two threads will attempt to
-        do housekeeping at the same time (causing conflicts).
-        """
+    def _in_emergency_bucket_shortage(self, now):
+        max_ts = self._max_timeslice()
         low = now/self._period
-        high = self._max_timeslice()/self._period
-        if high <= low:
-            # we really need to win this roll because we have no
-            # spare buckets (and no valid values to provide to randrange), so
-            # we rig the toss.
-            DEBUG and TLOG('_roll: %s rigged toss' % reason)
-            return True
-        else:
-            # we're not in an emergency bucket shortage, so we can
-            # take our chances during the roll.  It's unlikely that
-            # two threads will win the roll simultaneously, so we
-            # avoid a certain class of conflicts here.
-            if random.randrange(low, high) == low: # WINNAH!
-                DEBUG and TLOG("_roll: %s roll winner" % reason)
-                return True
-        DEBUG and TLOG("_roll: %s roll loser" % reason)
-        return False
+        high = max_ts/self._period
+        required = high <= low
+        return required
 
-    def _get_max_expired_ts(self, now):
-        return now - (self._period * (self._timeout_slices + 1))
-
     def _finalize(self, now):
+        """ Call finalization handlers for the data in each stale bucket """
         if not self._timeout_slices:
             DEBUG and TLOG('_finalize: doing nothing (no timeout)')
             return # don't do any finalization if there is no timeout
 
         # The nature of sessioning is that when the timeslice rolls
         # over, all active threads will try to do a lot of work during
-        # finalization, all but one unnecessarily.  We really don't
-        # want more than one thread at a time to try to finalize
-        # buckets at the same time so we try to lock. We give up if we
-        # can't lock immediately because it doesn't matter if we skip
-        # a couple of opportunities for finalization, as long as it
-        # gets done by some thread eventually.  A similar pattern
-        # exists for _gc and _replentish.
+        # finalization if inband housekeeping is enabled, all but one
+        # unnecessarily.  We really don't want more than one thread at
+        # a time to try to finalize buckets at the same time so we try
+        # to lock. We give up if we can't lock immediately because it
+        # doesn't matter if we skip a couple of opportunities for
+        # finalization, as long as it gets done by some thread
+        # eventually.  A similar pattern exists for _gc and
+        # _replentish.
 
         if not self.finalize_lock.acquire(0):
-            DEBUG and TLOG('_finalize: couldnt acquire lock')
+            DEBUG and TLOG('_finalize: could not acquire lock, returning')
             return
 
         try:
             DEBUG and TLOG('_finalize: lock acquired successfully')
+            last_finalized = self._last_finalized_timeslice()
 
-            if now is None:
-                now = getCurrentTimeslice(self._period) # for unit tests
-
             # we want to start finalizing from one timeslice after the
-            # timeslice which we last finalized.  Note that finalizing
-            # an already-finalized bucket somehow sends persistence
-            # into a spin with an exception later raised:
-            # "SystemError: error return without exception set",
-            # typically coming from
-            # Products.Sessions.SessionDataManager, line 182, in
-            # _getSessionDataObject (if getattr(ob, '__of__', None)
-            # and getattr(ob, 'aq_parent', None)). According to this
-            # email message from Jim, it may be because the ob is
-            # ghosted and doesn't have a _p_jar somehow:
-            #http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
+            # timeslice which we last finalized.
+            
+            start_finalize  = last_finalized + self._period
 
-            start_finalize  = self._last_finalized_timeslice() + self._period
-
             # we want to finalize only up to the maximum expired timeslice
             max_ts = self._get_max_expired_ts(now)
 
@@ -577,124 +559,221 @@
                     '_finalize: start_finalize (%s) >= max_ts (%s), '
                     'doing nothing' % (start_finalize, max_ts))
                 return
-        
-            DEBUG and TLOG('_finalize: now is %s' % now)
-            DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
-            DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
+            else:
+                DEBUG and TLOG(
+                    '_finalize: start_finalize (%s) <= max_ts (%s), '
+                    'finalization possible' % (start_finalize, max_ts))
+                # we don't try to avoid conflicts here by doing a "random"
+                # dance (ala _replentish and _gc) because it's important that
+                # buckets are finalized as soon as possible after they've
+                # expired in order to call the delete notifier "on time".
+                self._do_finalize_work(now, max_ts, start_finalize)
 
-            to_finalize = list(self._data.keys(start_finalize, max_ts))
-            DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
+        finally:
+            self.finalize_lock.release()
 
-            delta = 0
+    def _do_finalize_work(self, now, max_ts, start_finalize):
+        # this is only separated from _finalize for readability; it
+        # should generally not be called by anything but _finalize
+        DEBUG and TLOG('_do_finalize_work: entering')
+        DEBUG and TLOG('_do_finalize_work: now is %s' % now)
+        DEBUG and TLOG('_do_finalize_work: max_ts is %s' % max_ts)
+        DEBUG and TLOG('_do_finalize_work: start_finalize is %s' %
+                       start_finalize)
 
-            for key in to_finalize:
+        to_finalize = list(self._data.keys(start_finalize, max_ts))
+        DEBUG and TLOG('_do_finalize_work: to_finalize is %s' % `to_finalize`)
 
-                assert(start_finalize <= key <= max_ts)
-                STRICT and _assert(self._data.has_key(key))
-                values = list(self._data[key].values())
-                DEBUG and TLOG('_finalize: values to notify from ts %s '
-                               'are %s' % (key, `list(values)`))
+        delta = 0
 
-                delta += len(values)
+        for key in to_finalize:
 
-                for v in values:
-                    self.notifyDel(v)
+            _assert(start_finalize <= key)
+            _assert(key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            values = list(self._data[key].values())
+            DEBUG and TLOG('_do_finalize_work: values to notify from ts %s '
+                           'are %s' % (key, `list(values)`))
 
-            if delta:
-                self._length.change(-delta)
+            delta += len(values)
 
-            DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
-                           'to max_ts of %s' % max_ts)
+            for v in values:
+                self.notifyDel(v)
 
-            self._last_finalized_timeslice.set(max_ts)
+        if delta:
+            self._length.decrement(delta)
 
-        finally:
-            self.finalize_lock.release()
+        DEBUG and TLOG('_do_finalize_work: setting _last_finalized_timeslice '
+                       'to max_ts of %s' % max_ts)
 
+        self._last_finalized_timeslice.set(max_ts)
+
+    def _invoke_finalize_and_gc(self):
+        # for unit testing purposes only!
+        last_finalized = self._last_finalized_timeslice()
+        now = getCurrentTimeslice(self._period) # for unit tests
+        start_finalize  = last_finalized + self._period
+        max_ts = self._get_max_expired_ts(now)
+        self._do_finalize_work(now, max_ts, start_finalize)
+        self._do_gc_work(now)
+
     def _replentish(self, now):
-        # available_spares == the number of "spare" buckets that exist in
-        # "_data"
+        """ Add 'fresh' future or current buckets """
         if not self._timeout_slices:
-            return # do nothing if no timeout
+            DEBUG and TLOG('_replentish: no timeout, doing nothing')
+            return
         
-        if not self.replentish_lock.acquire(0):
-            DEBUG and TLOG('_replentish: couldnt acquire lock')
-            return
+        # the difference between high and low naturally diminishes to
+        # zero as now approaches self._max_timeslice() during normal
+        # operations.  If high <= low, it means we have no current bucket,
+        # so we *really* need to replentish (having a current bucket is
+        # an invariant for continued operation).
 
+        required = self._in_emergency_bucket_shortage(now)
+        lock_acquired = self.replentish_lock.acquire(0)
+
         try:
-            max_ts = self._max_timeslice()
-            available_spares = (max_ts-now) / self._period
-            DEBUG and TLOG('_replentish: now = %s' % now)
-            DEBUG and TLOG('_replentish: max_ts = %s' % max_ts)
-            DEBUG and TLOG('_replentish: available_spares = %s'
-                           % available_spares)
+            if required:
+                # we're in an emergency bucket shortage, we need to
+                # replentish regardless of whether we got the lock or
+                # not.  (if we didn't get the lock, this transaction
+                # will likely result in a conflict error, that's ok)
+                if lock_acquired:
+                    DEBUG and TLOG('_replentish: required, lock acquired)')
+                else:
+                    DEBUG and TLOG('_replentish: required, lock NOT acquired)')
+                max_ts = self._max_timeslice()
+                self._do_replentish_work(now, max_ts)
 
-            if available_spares >= SPARE_BUCKETS:
-                DEBUG and TLOG('_replentish: available_spares (%s) >= '
-                               'SPARE_BUCKETS (%s), doing '
-                               'nothing'% (available_spares,
-                                           SPARE_BUCKETS))
-                return
+            elif lock_acquired:
+                # If replentish is optional, minimize the chance that
+                # two threads will attempt to do replentish work at
+                # the same time (which causes conflicts) by
+                # introducing a random element.
+                DEBUG and TLOG('_replentish: attempting optional replentish '
+                               '(lock acquired)')
+                max_ts = self._max_timeslice()
+                low = now/self._period
+                high = max_ts/self._period
+                if roll(low, high, 'optional replentish'):
+                    self._do_replentish_work(now, max_ts)
 
-            if max_ts < now:
-                replentish_start = now
-                replentish_end = now + (self._period * SPARE_BUCKETS)
-
             else:
-                replentish_start = max_ts + self._period
-                replentish_end = max_ts + (self._period * SPARE_BUCKETS)
+                # This is an optional replentish and we can't acquire
+                # the lock, bail.
+                DEBUG and TLOG('_optional replentish attempt aborted, could '
+                               'not acquire lock.')
+                return
 
-            DEBUG and TLOG('_replentish: replentish_start = %s' %
-                           replentish_start)
-            DEBUG and TLOG('_replentish: replentish_end = %s'
-                           % replentish_end)
-            # n is the number of buckets to create
-            n = (replentish_end - replentish_start) / self._period
-            new_buckets = getTimeslices(replentish_start, n, self._period)
-            new_buckets.reverse()
-            STRICT and _assert(new_buckets)
-            DEBUG and TLOG('_replentish: adding %s new buckets' % n)
-            DEBUG and TLOG('_replentish: buckets to add = %s'
-                           % new_buckets)
-            for k in new_buckets:
-                STRICT and _assert(not self._data.has_key(k))
-                try:
-                    self._data[k] = BUCKET_CLASS()
-                except ConflictError:
-                    DEBUG and TLOG('_replentish: conflict when adding %s' % k)
-                    time.sleep(random.uniform(0, 1)) # add entropy
-                    raise
-            self._max_timeslice.set(max(new_buckets))
         finally:
-            self.replentish_lock.release()
+            if lock_acquired:
+                self.replentish_lock.release()
 
+    def _do_replentish_work(self, now, max_ts):
+        DEBUG and TLOG('_do_replentish_work: entering')
+        # this is only separated from _replentish for readability; it
+        # should generally not be called by anything but _replentish
+
+        # available_spares == the number of "spare" buckets that exist
+        # in "_data"
+        available_spares = (max_ts - now) / self._period
+        DEBUG and TLOG('_do_replentish_work: now = %s' % now)
+        DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
+        DEBUG and TLOG('_do_replentish_work: available_spares = %s'
+                       % available_spares)
+
+        if available_spares >= SPARE_BUCKETS:
+            DEBUG and TLOG('_do_replentish_work: available_spares (%s) >= '
+                           'SPARE_BUCKETS (%s), doing '
+                           'nothing'% (available_spares,
+                                       SPARE_BUCKETS))
+            return
+
+        if max_ts < now:
+            # the newest bucket in self._data is older than now!
+            replentish_start = now
+            replentish_end = now + (self._period * SPARE_BUCKETS)
+
+        else:
+            replentish_start = max_ts + self._period
+            replentish_end = max_ts + (self._period * (SPARE_BUCKETS +1))
+
+        DEBUG and TLOG('_do_replentish_work: replentish_start = %s' %
+                       replentish_start)
+        DEBUG and TLOG('_do_replentish_work: replentish_end = %s'
+                       % replentish_end)
+        # n is the number of buckets to create
+        n = (replentish_end - replentish_start) / self._period
+        new_buckets = getTimeslices(replentish_start, n, self._period)
+        new_buckets.reverse()
+        STRICT and _assert(new_buckets)
+        DEBUG and TLOG('_do_replentish_work: adding %s new buckets' % n)
+        DEBUG and TLOG('_do_replentish_work: buckets to add = %s'
+                       % new_buckets)
+        for k in new_buckets:
+            STRICT and _assert(not self._data.has_key(k))
+            self._data[k] = BUCKET_CLASS() # XXX ReadConflictError hotspot
+
+        self._max_timeslice.set(max(new_buckets))
+
     def _gc(self, now=None):
+        """ Remove stale buckets """
         if not self._timeout_slices:
             return # dont do gc if there is no timeout
 
+        # give callers a good chance to do nothing (gc isn't as important
+        # as replentishment or finalization)
+        if not roll(0, 5, 'gc'):
+            DEBUG and TLOG('_gc: lost roll, doing nothing')
+            return
+
         if not self.gc_lock.acquire(0):
             DEBUG and TLOG('_gc: couldnt acquire lock')
             return
 
-        try:
+        try: 
             if now is None:
                 now = getCurrentTimeslice(self._period) # for unit tests
 
-            # we want to garbage collect all buckets that have already been run
-            # through finalization
-            max_ts = self._last_finalized_timeslice()
+            last_gc = self._last_gc_timeslice()
+            gc_every = self._period * round(SPARE_BUCKETS / 2.0)
 
-            DEBUG and TLOG('_gc: now is %s' % now)
-            DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
+            if (now - last_gc) < gc_every:
+                DEBUG and TLOG('_gc: gc attempt not yet required '
+                               '( (%s - %s) < %s )' % (now, last_gc, gc_every))
+                return
+            else:
+                DEBUG and TLOG(
+                    '_gc:  (%s -%s) > %s, gc invoked' % (now, last_gc,
+                                                          gc_every))
+                self._do_gc_work(now)
 
-            for key in list(self._data.keys(None, max_ts)):
-                assert(key <= max_ts)
-                STRICT and _assert(self._data.has_key(key))
-                DEBUG and TLOG('deleting %s from _data' % key)
-                del self._data[key]
         finally:
             self.gc_lock.release()
 
+    def _do_gc_work(self, now):
+        # this is only separated from _gc for readability; it should
+        # generally not be called by anything but _gc
+
+        # we garbage collect any buckets that have already been run
+        # through finalization
+        DEBUG and TLOG('_do_gc_work: entering')
+
+        max_ts = self._last_finalized_timeslice()
+
+        DEBUG and TLOG('_do_gc_work: max_ts is %s' % max_ts)
+        to_gc = list(self._data.keys(None, max_ts))
+        DEBUG and TLOG('_do_gc_work: to_gc is: %s' % str(to_gc))
+
+        for key in to_gc:
+            _assert(key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            DEBUG and TLOG('_do_gc_work: deleting %s from _data' % key)
+            del self._data[key]
+
+        DEBUG and TLOG('_do_gc_work: setting last_gc_timeslice to %s' % now)
+        self._last_gc_timeslice.set(now)
+
     def notifyAdd(self, item):
         DEBUG and TLOG('notifyAdd with %s' % item)
         callback = self._getCallback(self._addCallback)
@@ -830,13 +909,37 @@
     def setDelNotificationTarget(self, f):
         self._delCallback = f
 
-    security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
-    def nudge(self):
-        """ Used by mgmt interface to maybe do housekeeping each time
-        a screen is shown """
-        # run garbage collector so view is correct
-        self._gc()
+    security.declareProtected(MGMT_SCREEN_PERM, 'disableInbandHousekeeping')
+    def disableInbandHousekeeping(self):
+        """ No longer perform inband housekeeping """
+        self._inband_housekeeping = False
 
+    security.declareProtected(MGMT_SCREEN_PERM, 'enableInbandHousekeeping')
+    def enableInbandHousekeeping(self):
+        """ (Re)enable inband housekeeping """
+        self._inband_housekeeping = True
+
+    security.declareProtected(MGMT_SCREEN_PERM, 'isInbandHousekeepingEnabled')
+    def isInbandHousekeepingEnabled(self):
+        """ Report if inband housekeeping is enabled """
+        return self._inband_housekeeping
+
+    security.declareProtected('View', 'housekeep')
+    def housekeep(self):
+        """ Call this from a scheduler at least every
+        self._period * (SPARE_BUCKETS - 1) seconds to perform out of band
+        housekeeping """
+        # we can protect this method from being called too often by
+        # anonymous users as necessary in the future; we already have a lot
+        # of protection as-is though so no need to make it more complicated
+        # than necessary at the moment
+        self._housekeep(getCurrentTimeslice(self._period))
+
+    def _housekeep(self, now):
+        self._finalize(now)
+        self._replentish(now)
+        self._gc(now)
+
     security.declareProtected(MANAGE_CONTAINER_PERM,
         'manage_changeTransientObjectContainer')
     def manage_changeTransientObjectContainer(
@@ -868,9 +971,17 @@
 
         # f/w compat: 2.8 cannot use __len__ as an instance variable
         if not state.has_key('_length'):
-            length = state.get('__len__', Length())
+            length = state.get('__len__', Length2())
             self._length = self.getLen = length
 
+        oldlength = state['_length']
+        if isinstance(oldlength, BTreesLength):
+            # TOCS prior to 2.7.3 had a BTrees.Length.Length object as
+            # the TOC length object, replace it with our own Length2
+            # that does our conflict resolution correctly:
+            sz = oldlength()
+            self._length = self.getLen = Length2(sz)
+
         # TOCs prior to 2.7.1 took their period from a global
         if not state.has_key('_period'):
             self._period = 20 # this was the default for all prior releases
@@ -891,6 +1002,10 @@
         if not state.has_key('_last_finalized_timeslice'):
             self._last_finalized_timeslice = Increaser(-self._period)
 
+        # TOCs prior to 2.7.3 didn't have a _last_gc_timeslice
+        if not state.has_key('_last_gc_timeslice'):
+            self._last_gc_timeslice = Increaser(-self._period)
+
         # we should probably delete older attributes from state such as
         # '_last_timeslice', '_deindex_next',and '__len__' here but we leave
         # them in order to allow people to switch between 2.6.0->2.7.0 and
@@ -919,6 +1034,22 @@
         l.insert(0, begin + (x * period))
     return l
 
+def roll(low, high, reason):
+    try:
+        result = random.randrange(low, high)
+    except ValueError:
+        # empty range, must win this roll
+        result = low
+
+    if result == low:
+        DEBUG and TLOG('roll: low: %s, high: %s: won with %s (%s)' %
+                       (low, high, result, reason))
+        return True
+    else:
+        DEBUG and TLOG('roll: low: %s, high: %s: lost with %s (%s)' %
+                       (low, high, result, reason))
+        return False
+
 def _assert(case):
     if not case:
         raise AssertionError
@@ -926,8 +1057,8 @@
 class Increaser(Persistent):
     """
     A persistent object representing a typically increasing integer that
-    has conflict resolution uses the greatest integer out of the three
-    available states
+    has conflict resolution which uses the greatest integer out of the three
+    available states.
     """
     def __init__(self, v):
         self.value = v
@@ -947,7 +1078,51 @@
     def _p_resolveConflict(self, old, state1, state2):
         return max(old, state1, state2)
 
-    def _p_independent(self):
-        return 1
 
+class Length2(Persistent):
+    """
+    A persistent object responsible for maintaining a repesention of
+    the number of current transient objects.
+
+    Conflict resolution is sensitive to which methods are used to
+    change the length.
+    """
+    def __init__(self, value=0):
+        self.set(value)
+
+    def set(self, value):
+        self.value = value
+        self.floor = 0
+        self.ceiling = value
+
+    def increment(self, delta):
+        """Increase the length by delta.
+
+        Conflict resolution will take the sum of all the increments."""
+        self.ceiling += delta
+        self.value += delta
+
+    def decrement(self, delta):
+        """Decrease the length by delta.
+
+        Conflict resolution will take the highest decrement."""
+        self.floor += delta
+        self.value -= delta
+
+    def __getstate__(self):
+        return self.__dict__
+
+    def __setstate__(self, state):
+        self.__dict__.update(state)
+
+    def __call__(self):
+        return self.value
+
+    def _p_resolveConflict(self, old, saved, new):
+        new['ceiling'] = saved['ceiling'] + new['ceiling'] - old['ceiling']
+        new['floor'] = max(old['floor'], saved['floor'], new['floor'])
+        new['value'] = new['ceiling'] - new['floor']
+        return new
+
 Globals.InitializeClass(TransientObjectContainer)
+

Modified: Zope/trunk/lib/python/Products/Transience/TransientObject.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/TransientObject.py	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/TransientObject.py	2004-09-18 03:27:57 UTC (rev 27630)
@@ -16,6 +16,8 @@
 $Id$
 """
 
+__version__='$Revision: 1.9.68.5 $'[11:-2]
+
 from Persistence import Persistent
 from Acquisition import Implicit
 import time, random, sys, os
@@ -192,70 +194,60 @@
     # Other non interface code
     #
 
-    def _p_independent(self):
-        # My state doesn't depend on or materially effect the state of
-        # other objects (eliminates read conflicts).
-        return 1
-
     def _p_resolveConflict(self, saved, state1, state2):
         DEBUG and TLOG('entering TO _p_rc')
         DEBUG and TLOG('states: sv: %s, s1: %s, s2: %s' % (
             saved, state1, state2))
-        try:
-            states = [saved, state1, state2]
+        states = [saved, state1, state2]
 
-            # We can clearly resolve the conflict if one state is invalid,
-            # because it's a terminal state.
-            for state in states:
-                if state.has_key('_invalid'):
-                    DEBUG and TLOG('TO _p_rc: a state was invalid')
-                    return state
-            # The only other times we can clearly resolve the conflict is if
-            # the token, the id, or the creation time don't differ between
-            # the three states, so we check that here.  If any differ, we punt
-            # by raising ConflictError.
-            attrs = ['token', 'id', '_created']
-            for attr in attrs:
-                svattr = saved.get(attr)
-                s1attr = state1.get(attr)
-                s2attr = state2.get(attr)
-                DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
-                               (attr, svattr, s1attr, s2attr))
-                if not svattr==s1attr==s2attr:
-                    DEBUG and TLOG('TO _p_rc: cant resolve conflict')
-                    raise ConflictError
+        # We can clearly resolve the conflict if one state is invalid,
+        # because it's a terminal state.
+        for state in states:
+            if state.has_key('_invalid'):
+                DEBUG and TLOG('TO _p_rc: a state was invalid')
+                return state
 
-            # Now we need to do real work.
-            #
-            # Data in our _container dictionaries might conflict.  To make
-            # things simple, we intentionally create a race condition where the
-            # state which was last modified "wins".  It would be preferable to
-            # somehow merge our _containers together, but as there's no
-            # generally acceptable way to union their states, there's not much
-            # we can do about it if we want to be able to resolve this kind of
-            # conflict.
+        # The only other times we can clearly resolve the conflict is if
+        # the token, the id, or the creation time don't differ between
+        # the three states, so we check that here.  If any differ, we punt
+        # by raising ConflictError.
+        attrs = ['token', 'id', '_created']
+        for attr in attrs:
+            svattr = saved.get(attr)
+            s1attr = state1.get(attr)
+            s2attr = state2.get(attr)
+            DEBUG and TLOG('TO _p_rc: attr %s: sv: %s s1: %s s2: %s' %
+                           (attr, svattr, s1attr, s2attr))
+            if not svattr==s1attr==s2attr:
+                DEBUG and TLOG('TO _p_rc: cant resolve conflict')
+                raise ConflictError
 
-            # We return the state which was most recently modified, if
-            # possible.
-            states.sort(lastmodified_sort)
-            if states[0].get('_last_modified'):
-                DEBUG and TLOG('TO _p_rc: returning last mod state')
-                return states[0]
+        # Now we need to do real work.
+        #
+        # Data in our _container dictionaries might conflict.  To make
+        # things simple, we intentionally create a race condition where the
+        # state which was last modified "wins".  It would be preferable to
+        # somehow merge our _containers together, but as there's no
+        # generally acceptable way to union their states, there's not much
+        # we can do about it if we want to be able to resolve this kind of
+        # conflict.
 
-            # If we can't determine which object to return on the basis
-            # of last modification time (no state has been modified), we return
-            # the object that was most recently accessed (last pulled out of
-            # our parent).  This will return an essentially arbitrary state if
-            # all last_accessed values are equal.
-            states.sort(lastaccessed_sort)
-            DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+        # We return the state which was most recently modified, if
+        # possible.
+        states.sort(lastmodified_sort)
+        if states[0].get('_last_modified'):
+            DEBUG and TLOG('TO _p_rc: returning last mod state')
             return states[0]
-        except ConflictError:
-            raise
-        except:
-            LOG.info('Conflict resolution error in TransientObject',
-                      exc_info=sys.exc_info())
 
+        # If we can't determine which object to return on the basis
+        # of last modification time (no state has been modified), we return
+        # the object that was most recently accessed (last pulled out of
+        # our parent).  This will return an essentially arbitrary state if
+        # all last_accessed values are equal.
+        states.sort(lastaccessed_sort)
+        DEBUG and TLOG('TO _p_rc: returning last_accessed state')
+        return states[0]
+
     getName = getId # this is for SQLSession compatibility
 
     def _generateUniqueId(self):

Modified: Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml
===================================================================
--- Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/dtml/manageTransientObjectContainer.dtml	2004-09-18 03:27:57 UTC (rev 27630)
@@ -13,7 +13,7 @@
 (the "data object timeout") after which it will be flushed.
 </p>
 
-<dtml-call nudge><!-- turn the buckets if necessary -->
+<dtml-call housekeep><!-- turn the buckets if necessary -->
 
 <p class="form-label">
 <font color="green">

Added: Zope/trunk/lib/python/Products/Transience/tests/testCounters.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/tests/testCounters.py	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/tests/testCounters.py	2004-09-18 03:27:57 UTC (rev 27630)
@@ -0,0 +1,99 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import os
+from unittest import TestCase, TestSuite, makeSuite
+from ZODB.POSException import ConflictError
+from ZODB.FileStorage import FileStorage
+from ZODB.DB import DB
+
+from Products.Transience.Transience import Length2, Increaser
+
+class Base(TestCase):
+    db = None
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        if self.db is not None:
+            self.db.close()
+            self.storage.cleanup()
+
+    def openDB(self):
+        n = 'fs_tmp__%s' % os.getpid()
+        self.storage = FileStorage(n)
+        self.db = DB(self.storage)
+
+class TestLength2(Base):
+
+    def testConflict(self):
+        # this test fails on the HEAD (MVCC?)
+        self.openDB()
+        length = Length2(0)
+
+        r1 = self.db.open().root()
+        r1['ob'] = length
+        get_transaction().commit()
+
+        r2 = self.db.open().root()
+        copy = r2['ob']
+        # The following ensures that copy is loaded.
+        self.assertEqual(copy(),0)
+
+        # First transaction.
+        length.increment(10)
+        length.decrement(1)
+        get_transaction().commit()
+
+        # Second transaction.
+        length = copy
+        length.increment(20)
+        length.decrement(2)
+        get_transaction().commit()
+
+        self.assertEqual(length(), 10+20-max(1,2))
+
+class TestIncreaser(Base):
+
+    def testConflict(self):
+        self.openDB()
+        increaser = Increaser(0)
+
+        r1 = self.db.open().root()
+        r1['ob'] = increaser
+        get_transaction().commit()
+
+        r2 = self.db.open().root()
+        copy = r2['ob']
+        # The following ensures that copy is loaded.
+        self.assertEqual(copy(),0)
+
+        # First transaction.
+        increaser.set(10)
+        get_transaction().commit()
+
+
+        # Second transaction.
+        increaser = copy
+        increaser.set(20)
+        get_transaction().commit()
+
+        self.assertEqual(increaser(), 20)
+
+def test_suite():
+    suite = TestSuite()
+    suite.addTest(makeSuite(TestLength2))
+    suite.addTest(makeSuite(TestIncreaser))
+    return suite

Added: Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/tests/testTransactionHelper.py	2004-09-18 03:27:57 UTC (rev 27630)
@@ -0,0 +1,40 @@
+##############################################################################
+#
+# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import sys, os, time, random, unittest
+
+if __name__ == "__main__":
+    sys.path.insert(0, '../../..')
+
+import ZODB
+from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
+from Products.Transience.TransactionHelper import PreventTransactionCommit, \
+         makeTransactionUncommittable
+
+class TestTransactionHelper(TestCase):
+    def setUp(self):
+        self.t = get_transaction()
+
+    def tearDown(self):
+        self.t = None
+
+    def testUncommittable(self):
+        makeTransactionUncommittable(self.t, "test")
+        self.assertRaises(PreventTransactionCommit, get_transaction().commit)
+        
+def test_suite():
+    suite = makeSuite(TestTransactionHelper, 'test')
+    return suite
+
+if __name__ == '__main__':
+    runner = TextTestRunner(verbosity=9)
+    runner.run(test_suite())

Modified: Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py
===================================================================
--- Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py	2004-09-17 22:31:32 UTC (rev 27629)
+++ Zope/trunk/lib/python/Products/Transience/tests/testTransientObjectContainer.py	2004-09-18 03:27:57 UTC (rev 27630)
@@ -17,7 +17,7 @@
 
 import ZODB
 from Products.Transience.Transience import TransientObjectContainer,\
-     MaxTransientObjectsExceeded
+     MaxTransientObjectsExceeded, SPARE_BUCKETS, getCurrentTimeslice
 from Products.Transience.TransientObject import TransientObject
 import Products.Transience.Transience
 import Products.Transience.TransientObject
@@ -380,6 +380,18 @@
         fauxtime.sleep(180)
         self.assertEqual(len(self.t.keys()), 100)
 
+    def testGarbageCollection(self):
+        # this is pretty implementation-dependent :-(
+        for x in range(0, 100):
+            self.t[x] = x
+        sleeptime = self.period * SPARE_BUCKETS
+        fauxtime.sleep(sleeptime)
+        self.t._invoke_finalize_and_gc()
+        max_ts = self.t._last_finalized_timeslice()
+        keys = list(self.t._data.keys())
+        for k in keys:
+            self.assert_(k > max_ts, "k %s < max_ts %s" % (k, max_ts))
+
     def _maxOut(self):
         for x in range(11):
             self.t.new(str(x))



More information about the Zope-Checkins mailing list