[Zope-Checkins] CVS: Products/Transience - Transience.py:1.32.12.8.2.4

Chris McDonough chrism at plope.com
Mon Sep 13 11:20:27 EDT 2004


Update of /cvs-repository/Products/Transience
In directory cvs.zope.org:/tmp/cvs-serv19035

Modified Files:
      Tag: chrism-pre273-branch
	Transience.py 
Log Message:
Add a knob (not exposed to UI) to turn off "inband" housekeeping Housekeeping
can now optionally be done using an external scheduling facility by calling the "housekeep" method regularly.
                                                                                
Break out actual work that _gc and _finalize do into separate _do methods
for legibility.
                                                                                
Dont raise Retry in _replentish if we're in a bucket shortage and we can't
get the lock.  Instead just soldier on and let the conflict happen naturally.
                                                                                
Create a "roll" function and attempt to prevent conflicts in _gc by using a 
roll.
                                                                                
Remove "nudge" function in favor of "housekeep".

Set SPARE_BUCKETS back to 15.



=== Products/Transience/Transience.py 1.32.12.8.2.3 => 1.32.12.8.2.4 ===
--- Products/Transience/Transience.py:1.32.12.8.2.3	Sat Sep 11 21:58:56 2004
+++ Products/Transience/Transience.py	Mon Sep 13 11:19:51 2004
@@ -54,7 +54,7 @@
 ACCESS_TRANSIENTS_PERM = 'Access Transient Objects'
 MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
 
-SPARE_BUCKETS = 1 # minimum number of buckets to keep "spare"
+SPARE_BUCKETS = 15 # minimum number of buckets to keep "spare"
 BUCKET_CLASS = OOBTree # constructor for buckets
 DATA_CLASS = IOBTree # const for main data structure (timeslice->"bucket")
 STRICT = os.environ.get('Z_TOC_STRICT', '')
@@ -130,6 +130,7 @@
 
     _limit = 0
     _data = None
+    _inband_housekeeping = True
 
     security.setDefaultAccess('deny')
 
@@ -274,10 +275,18 @@
             bucket = self._data.get(0)
             return bucket.get(k, default)
 
-        # do housekeeping
-        self._finalize(current_ts)
-        self._replentish(current_ts)
-        self._gc(current_ts)
+        if self._inband_housekeeping:
+            self._housekeep(current_ts)
+
+        else:
+            # dont allow the TOC to stop working in an emergency bucket
+            # shortage
+            max_ts = self._max_timeslice()
+            low = now/self._period
+            high = max_ts/self._period
+            required = high <= low
+            if required:
+                self._replentish(current_ts)
 
         # SUBTLETY ALERTY TO SELF: do not "improve" the code below
         # unnecessarily, as it will end only in tears.  The lack of aliases
@@ -348,10 +357,12 @@
         else:
             current_ts = 0
 
-        # do housekeeping
-        self._finalize(current_ts)
-        self._replentish(current_ts)
-        self._gc(current_ts)
+        if self._inband_housekeeping:
+            self._housekeep(current_ts)
+
+        elif self._in_emergency_bucket_shortage(current_ts)[0]:
+            # if our scheduler fails, dont allow the TOC to stop working
+            self._replentish(current_ts, force=True)
 
         STRICT and _assert(self._data.has_key(current_ts))
         current = self._getCurrentSlices(current_ts)
@@ -504,20 +515,22 @@
 
         # The nature of sessioning is that when the timeslice rolls
         # over, all active threads will try to do a lot of work during
-        # finalization, all but one unnecessarily.  We really don't
-        # want more than one thread at a time to try to finalize
-        # buckets at the same time so we try to lock. We give up if we
-        # can't lock immediately because it doesn't matter if we skip
-        # a couple of opportunities for finalization, as long as it
-        # gets done by some thread eventually.  A similar pattern
-        # exists for _gc and _replentish.
+        # finalization if inband housekeeping is enabled, all but one
+        # unnecessarily.  We really don't want more than one thread at
+        # a time to try to finalize buckets at the same time so we try
+        # to lock. We give up if we can't lock immediately because it
+        # doesn't matter if we skip a couple of opportunities for
+        # finalization, as long as it gets done by some thread
+        # eventually.  A similar pattern exists for _gc and
+        # _replentish.
 
         if not self.finalize_lock.acquire(0):
-            DEBUG and TLOG('_finalize: couldnt acquire lock')
+            DEBUG and TLOG('_finalize: could not acquire lock, returning')
             return
 
         try:
             DEBUG and TLOG('_finalize: lock acquired successfully')
+            last_finalized = self._last_finalized_timeslice()
 
             if now is None:
                 now = getCurrentTimeslice(self._period) # for unit tests
@@ -525,7 +538,7 @@
             # we want to start finalizing from one timeslice after the
             # timeslice which we last finalized.
             
-            start_finalize  = self._last_finalized_timeslice() + self._period
+            start_finalize  = last_finalized + self._period
 
             # we want to finalize only up to the maximum expired timeslice
             max_ts = self._get_max_expired_ts(now)
@@ -535,47 +548,62 @@
                     '_finalize: start_finalize (%s) >= max_ts (%s), '
                     'doing nothing' % (start_finalize, max_ts))
                 return
-        
-            DEBUG and TLOG('_finalize: now is %s' % now)
-            DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
-            DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
+            else:
+                DEBUG and TLOG(
+                    '_finalize: start_finalize (%s) <= max_ts (%s), '
+                    'finalization possible' % (start_finalize, max_ts))
+                # we don't try to avoid conflicts here by doing a "random"
+                # dance (ala _replentish and _gc) because it's important that
+                # buckets are finalized as soon as possible after they've
+                # expired in order to call the delete notifier "on time".
+                self._do_finalize_work(now, max_ts, start_finalize)
 
-            to_finalize = list(self._data.keys(start_finalize, max_ts))
-            DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
+        finally:
+            self.finalize_lock.release()
 
-            delta = 0
+    def _do_finalize_work(self, now, max_ts, start_finalize):
+        # this is only separated from _finalize for readability; it
+        # should generally not be called by anything but _finalize
+        DEBUG and TLOG('_do_finalize_work: entering')
+        DEBUG and TLOG('_do_finalize_work: now is %s' % now)
+        DEBUG and TLOG('_do_finalize_work: max_ts is %s' % max_ts)
+        DEBUG and TLOG('_do_finalize_work: start_finalize is %s' %
+                       start_finalize)
 
-            for key in to_finalize:
+        to_finalize = list(self._data.keys(start_finalize, max_ts))
+        DEBUG and TLOG('_do_finalize_work: to_finalize is %s' % `to_finalize`)
 
-                assert(start_finalize <= key <= max_ts)
-                STRICT and _assert(self._data.has_key(key))
-                values = list(self._data[key].values())
-                DEBUG and TLOG('_finalize: values to notify from ts %s '
-                               'are %s' % (key, `list(values)`))
+        delta = 0
 
-                delta += len(values)
+        for key in to_finalize:
 
-                for v in values:
-                    self.notifyDel(v)
+            assert(start_finalize <= key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            values = list(self._data[key].values())
+            DEBUG and TLOG('_do_finalize_work: values to notify from ts %s '
+                           'are %s' % (key, `list(values)`))
 
-            if delta:
-                self._length.change(-delta)
+            delta += len(values)
 
-            DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
-                           'to max_ts of %s' % max_ts)
+            for v in values:
+                self.notifyDel(v)
 
-            self._last_finalized_timeslice.set(max_ts)
+        if delta:
+            self._length.change(-delta)
+
+        DEBUG and TLOG('_do_finalize_work: setting _last_finalized_timeslice '
+                       'to max_ts of %s' % max_ts)
+
+        self._last_finalized_timeslice.set(max_ts)
 
-        finally:
-            self.finalize_lock.release()
 
     def _replentish(self, now):
         """ Add 'fresh' future or current buckets """
         if not self._timeout_slices:
-            return # do nothing if no timeout
+            DEBUG and TLOG('_replentish: no timeout, doing nothing')
+            return
         
         max_ts = self._max_timeslice()
-
         low = now/self._period
         high = max_ts/self._period
 
@@ -585,51 +613,49 @@
         # so we *really* need to replentish (having a current bucket is
         # an invariant for continued operation).
 
-        optional = not (high <= low)
-
-        if not self.replentish_lock.acquire(0):
-
-            if not optional:
-                DEBUG and TLOG('_replentish: no current bucket but cant aq '
-                               'lock, making txn uncommittable + retry')
-                raise Retry
-
-            else:
-                DEBUG and TLOG('_replentish: couldnt acquire lock, returning')
-                return
+        required = high <= low
+        lock_acquired = self.replentish_lock.acquire(0)
 
         try:
-            if optional:
-                DEBUG and TLOG('_replentish: attempting optional replentish')
-                # We're not in an emergency bucket shortage, so we don't 
-                # explicitly need to replentish with fresh new buckets.
-                # Minimize the chance that two threads will attempt to
-                # do housekeeping at the same time (which causes conflicts)
-                # by introducing a random element.
-                if random.randrange(low, high) == high: # do nothing
-                    DEBUG and TLOG('_replentish: lost random selection '
-                                   'in optional replentish, returning')
-                    return
+            if required:
+                # we're in an emergency bucket shortage, we need to
+                # replentish regardless of whether we got the lock or
+                # not.  (if we didn't get the lock, this transaction
+                # will likely result in a conflict error, that's ok)
+                if lock_acquired:
+                    DEBUG and TLOG('_replentish: required, lock acquired)')
                 else:
-                    DEBUG and TLOG('_replentish: won random selection '
-                                   'in optional replentish, continuing')
+                    DEBUG and TLOG('_replentish: required, lock NOT acquired)')
+                self._do_replentish_work(now, max_ts)
+
+            elif lock_acquired:
+                # If replentish is optional, minimize the chance that
+                # two threads will attempt to do replentish work at
+                # the same time (which causes conflicts) by
+                # introducing a random element.
+                DEBUG and TLOG('_replentish: attempting optional replentish '
+                               '(lock acquired)')
+                if roll(low, high, 'optional replentish'):
                     self._do_replentish_work(now, max_ts)
 
             else:
-                # we're in an emergency bucket shortage, we need to replentish
-                DEBUG and TLOG('_replentish: forcing replentish '
-                               '(no current bucket)')
-                self._do_replentish_work(now, max_ts)
+                # This is an optional replentish and we can't acquire
+                # the lock, bail.
+                DEBUG and TLOG('_optional replentish attempt aborted, could '
+                               'not acquire lock.')
+                return
 
         finally:
-            self.replentish_lock.release()
+            if lock_acquired:
+                self.replentish_lock.release()
 
     def _do_replentish_work(self, now, max_ts):
-        # this is only separated from _replentish for readability;
-        # it shouldn't be called without the replentish lock being held
+        DEBUG and TLOG('_do_replentish_work: entering')
+        # this is only separated from _replentish for readability; it
+        # should generally not be called by anything but _replentish
 
-        # available_spares == the number of "spare" buckets that exist in
-        # "_data"
+        # available_spares == the number of "spare" buckets that exist
+        # in "_data"
         available_spares = (max_ts - now) / self._period
         DEBUG and TLOG('_do_replentish_work: now = %s' % now)
         DEBUG and TLOG('_do_replentish_work: max_ts = %s' % max_ts)
@@ -698,25 +724,40 @@
                                '( (%s - %s) < %s )' % (now, last_gc, gc_every))
                 return
 
-            DEBUG and TLOG('_gc: gc attempt proceeding')
+            if now <= 0:
+                DEBUG and TLOG('_gc: now is %s bailing' % now)
+                return
+
+            multiplier = now / float(now - last_gc)
+            high = int(round(multiplier * 2))
+
+            DEBUG and TLOG(
+                '_gc:  (%s -%s) > %s, gc possible' % (now, last_gc, gc_every))
+            if roll(0, high, 'gc'):
+                self._do_gc_work(now)
+
+        finally:
+            self.gc_lock.release()
 
-            # we garbage collect any buckets that have already been run
-            # through finalization
+    def _do_gc_work(self, now):
+        # this is only separated from _gc for readability; it should
+        # generally not be called by anything but _gc
 
-            max_ts = self._last_finalized_timeslice()
+        # we garbage collect any buckets that have already been run
+        # through finalization
+        DEBUG and TLOG('_do_gc_work: entering')
 
-            DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
+        max_ts = self._last_finalized_timeslice()
 
-            for key in list(self._data.keys(None, max_ts)):
-                assert(key <= max_ts)
-                STRICT and _assert(self._data.has_key(key))
-                DEBUG and TLOG('deleting %s from _data' % key)
-                del self._data[key]
+        DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
 
-            self._last_gc_timeslice.set(now)
+        for key in list(self._data.keys(None, max_ts)):
+            assert(key <= max_ts)
+            STRICT and _assert(self._data.has_key(key))
+            DEBUG and TLOG('deleting %s from _data' % key)
+            del self._data[key]
 
-        finally:
-            self.gc_lock.release()
+        self._last_gc_timeslice.set(now)
 
     def notifyAdd(self, item):
         DEBUG and TLOG('notifyAdd with %s' % item)
@@ -853,11 +894,33 @@
     def setDelNotificationTarget(self, f):
         self._delCallback = f
 
-    security.declareProtected(MGMT_SCREEN_PERM, 'nudge')
-    def nudge(self):
-        """ Used by mgmt interface to maybe do housekeeping each time
-        a screen is shown """
-        now = getCurrentTimeslice(self._period)
+    security.declareProtected(MGMT_SCREEN_PERM, 'disableInbandHousekeeping')
+    def disableInbandHousekeeping(self):
+        """ No longer perform inband housekeeping """
+        self._inband_housekeeping = False
+
+    security.declareProtected(MGMT_SCREEN_PERM, 'enableInbandHousekeeping')
+    def enableInbandHousekeeping(self):
+        """ (Re)enable inband housekeeping """
+        self._inband_housekeeping = True
+
+    security.declareProtected(MGMT_SCREEN_PERM, 'isInbandHousekeepingEnabled')
+    def isInbandHousekeepingEnabled(self):
+        """ Report if inband housekeeping is enabled """
+        return self._inband_housekeeping
+
+    security.declareProtected('View', 'housekeep')
+    def housekeep(self):
+        """ Call this from a scheduler at least every
+        self._period * (SPARE_BUCKETS - 1) seconds to perform out of band
+        housekeeping """
+        # we can protect this method from being called too often by
+        # anonymous users as necessary in the future; we already have a lot
+        # of protection as-is though so no need to make it more complicated
+        # than necessary at the moment
+        self._housekeep(getCurrentTimeslice(self._period))
+
+    def _housekeep(self, now):
         self._finalize(now)
         self._replentish(now)
         self._gc(now)
@@ -948,6 +1011,22 @@
         l.insert(0, begin + (x * period))
     return l
 
+def roll(low, high, reason):
+    try:
+        result = random.randrange(low, high)
+    except ValueError:
+        # empty range, must win this roll
+        result = low
+
+    if result == low:
+        DEBUG and TLOG('roll: low: %s, high: %s: won with %s (%s)' %
+                       (low, high, result, reason))
+        return True
+    else:
+        DEBUG and TLOG('roll: low: %s, high: %s: lost with %s (%s)' %
+                       (low, high, result, reason))
+        return False
+
 def _assert(case):
     if not case:
         raise AssertionError
@@ -975,8 +1054,5 @@
 
     def _p_resolveConflict(self, old, state1, state2):
         return max(old, state1, state2)
-
-##     def _p_independent(self):
-##         return 1
 
 Globals.InitializeClass(TransientObjectContainer)



More information about the Zope-Checkins mailing list