[Zodb-checkins] CVS: ZODB3/bsddb3Storage/bsddb3Storage - BerkeleyBase.py:1.23

Barry Warsaw barry@wooz.org
Tue, 19 Nov 2002 14:48:09 -0500


Update of /cvs-repository/ZODB3/bsddb3Storage/bsddb3Storage
In directory cvs.zope.org:/tmp/cvs-serv28284

Modified Files:
	BerkeleyBase.py 
Log Message:
Based on suggestions by Toby, we're now throwing checkpointing into a
thread, instead of doing it every nth ZODB transaction.  We can
actually provide a base class for both the checkpointing and
autopacking threads here.  Changes here include:

SLEEP_TIME: how often (in seconds) a background thread should wake up
to see if there's work to do.

True, False: Add these for older Pythons.

PackStop: New class that acts as an escape hatch for pack operations.

BerkeleyConfig:
    - change interval from 100 to 0.  Now that this controls the
      checkpointing thread, the current default is to not spawn the
      thread.  I'm probably going to change this once I figure out
      what a good value is.

BerkeleyBase:
    - __init__(): Start the checkpointing thread if interval > 0

    - _setupDB(): Add some additional keyword args so that the QUEUE
      style tables can use this convenience method too.

    - close(): Be sure to stop and join the checkpointing thread

    - _docheckpoint(): Removed

    - _withtxn(): Catch PackStop escape hatch exceptions.  This one
      aborts the current Berkeley transaction but eats the exception.
      Also, don't call _docheckpoint() here.

     - docheckpoint(): New method which the checkpointing threads can
       call. 

env_from_string(): use DB_RECOVER_FATAL for autorecovery on open.

_WorkThread: New base class for the checkpointing and autopacking
threads.

_CheckPoint: The common checkpointing thread class.


=== ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py 1.22 => 1.23 ===
--- ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py:1.22	Mon Nov 11 15:56:30 2002
+++ ZODB3/bsddb3Storage/bsddb3Storage/BerkeleyBase.py	Tue Nov 19 14:48:08 2002
@@ -14,9 +14,12 @@
 
 """Base class for BerkeleyStorage implementations.
 """
+__version__ = '$Revision$'.split()[-2:][0]
 
 import os
+import time
 import errno
+import threading
 from types import StringType
 
 # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
@@ -30,10 +33,27 @@
 from ZODB.BaseStorage import BaseStorage
 from ZODB.referencesf import referencesf
 import ThreadLock
+import zLOG
 
 GBYTES = 1024 * 1024 * 1000
 
-__version__ = '$Revision$'.split()[-2:][0]
+# Maximum number of seconds for background thread to sleep before checking to
+# see if it's time for another autopack run.  Lower numbers mean more
+# processing, higher numbers mean less responsiveness to shutdown requests.
+# 10 seconds seems like a good compromise.  Note that if the check interval is
+# less than the sleep time, the minimum will be used.
+SLEEP_TIME = 10
+
+try:
+    True, False
+except NameError:
+    True = 1
+    False = 0
+
+
+
+class PackStop(Exception):
+    """Escape hatch for pack operations."""
 
 
 
@@ -53,9 +73,10 @@
 
     The following checkpointing attributes are supported:
 
-    - interval indicates the approximate number of Berkeley transaction
-      commits and aborts after which a checkpoint is performed.  Berkeley
-      transactions are performed after ZODB aborts, commits, and stores.
+    - interval indicates how often, in seconds, a Berkeley checkpoint is
+      performed.  If this is non-zero, checkpointing is performed by a
+      background thread.  Otherwise checkpointing will only be done when the
+      storage is closed.   You really want to enable checkpointing. ;)
 
     - kbytes is passed directly to txn_checkpoint()
 
@@ -98,7 +119,7 @@
       never automatically do classic packs.  For Minimal storage, this value
       is ignored -- all packs are classic packs.
     """
-    interval = 100
+    interval = 0
     kbyte = 0
     min = 0
     logdir = None
@@ -142,12 +163,10 @@
         Optional config must be a BerkeleyConfig instance, or None, which
         means to use the default configuration options.
         """
-
         # sanity check arguments
         if config is None:
             config = BerkeleyConfig()
         self._config = config
-        self._config._counter = 0
 
         if name == '':
             raise TypeError, 'database name is empty'
@@ -167,24 +186,36 @@
         # Instantiate a pack lock
         self._packlock = ThreadLock.allocate_lock()
         self._autopacker = None
+        self._stop = False
         # Initialize a few other things
         self._prefix = prefix
         # Give the subclasses a chance to interpose into the database setup
         # procedure
+        self._tables = []
         self._setupDBs()
         # Initialize the object id counter.
         self._init_oid()
+        if config.interval > 0:
+            self._checkpointer = _Checkpoint(self, config.interval)
+            self._checkpointer.start()
+        else:
+            self._checkpointer = None
 
-    def _setupDB(self, name, flags=0):
+    def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
         """Open an individual database with the given flags.
 
         flags are passed directly to the underlying DB.set_flags() call.
+        Optional dbtype specifies the type of BerkeleyDB access method to
+        use.  Optional reclen if not None gives the record length.
         """
         d = db.DB(self._env)
         if flags:
             d.set_flags(flags)
         # Our storage is based on the underlying BSDDB btree database type.
-        d.open(self._prefix + name, db.DB_BTREE, db.DB_CREATE)
+        if reclen is not None:
+            d.set_re_len(reclen)
+        d.open(self._prefix + name, dbtype, db.DB_CREATE)
+        self._tables.append(d)
         return d
 
     def _setupDBs(self):
@@ -270,6 +301,14 @@
         """Close the storage by closing the databases it uses and by closing
         its environment.
         """
+        # Close all the tables
+        if self._checkpointer:
+            zLOG.LOG('Full storage', zLOG.INFO,
+                     'stopping checkpointing thread')
+            self._checkpointer.stop()
+            self._checkpointer.join(SLEEP_TIME * 2)
+        for d in self._tables:
+            d.close()
         # As recommended by Keith Bostic @ Sleepycat, we need to do
         # two checkpoints just before we close the environment.
         # Otherwise, auto-recovery on environment opens can be
@@ -285,15 +324,6 @@
         self._env.close()
         os.unlink(lockfile)
 
-    def _docheckpoint(self):
-        # Periodically checkpoint the database.  This is called approximately
-        # once per Berkeley transaction commit or abort.
-        config = self._config
-        config._counter += 1
-        if config._counter > config.interval:
-            self._env.txn_checkpoint(config.kbyte, config.min)
-            config._counter = 0
-
     def _update(self, deltas, data, incdec):
         refdoids = []
         referencesf(data, refdoids)
@@ -316,16 +346,27 @@
         txn = self._env.txn_begin()
         try:
             ret = meth(txn, *args, **kws)
+        except PackStop:
+            # Escape hatch for shutdown during pack.  Like the bare except --
+            # i.e. abort the transaction -- but swallow the exception.
+            txn.abort()
         except:
             #import traceback ; traceback.print_exc()
             txn.abort()
-            self._docheckpoint()
             raise
         else:
             txn.commit()
-            self._docheckpoint()
             return ret
 
+    def docheckpoint(self):
+        config = self._config
+        self._lock_acquire()
+        try:
+            if not self._stop:
+                self._env.txn_checkpoint(config.kbyte, config.min)
+        finally:
+            self._lock_release()
+
 
 
 def env_from_string(envname, config):
@@ -356,10 +397,55 @@
     gbytes, bytes = divmod(config.cachesize, GBYTES)
     env.set_cachesize(gbytes, bytes)
     env.open(envname,
-             db.DB_CREATE       # create underlying files as necessary
-             | db.DB_RECOVER    # run normal recovery before opening
-             | db.DB_INIT_MPOOL # initialize shared memory buffer pool
-             | db.DB_INIT_TXN   # initialize transaction subsystem
-             | db.DB_THREAD     # we use the environment from other threads
+             db.DB_CREATE          # create underlying files as necessary
+             | db.DB_RECOVER_FATAL # run normal recovery before opening
+             | db.DB_INIT_MPOOL    # initialize shared memory buffer pool
+             | db.DB_INIT_TXN      # initialize transaction subsystem
+             | db.DB_THREAD        # we use the environment from other threads
              )
     return env, lockfile
+
+
+
+class _WorkThread(threading.Thread):
+    def __init__(self, storage, checkinterval, name='work'):
+        threading.Thread.__init__(self)
+        self._storage = storage
+        self._interval = checkinterval
+        self._name = name
+        # Bookkeeping
+        self._stop = False
+        self._nextcheck = checkinterval
+        # We don't want these threads to hold up process exit.  That could
+        # lead to corrupt databases, but recovery should ultimately save us.
+        self.setDaemon(True)
+
+    def run(self):
+        name = self._name
+        zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread started' % name)
+        while not self._stop:
+            now = time.time()
+            if now > self._nextcheck:
+                zLOG.LOG('Berkeley storage', zLOG.INFO, 'running %s' % name)
+                self._dowork(now)
+                self._nextcheck = now + self._interval
+            # Now we sleep for a little while before we check again.  Sleep
+            # for the minimum of self._interval and SLEEP_TIME so as to be as
+            # responsive as possible to .stop() calls.
+            time.sleep(min(self._interval, SLEEP_TIME))
+        zLOG.LOG('Berkeley storage', zLOG.INFO, '%s thread finished' % name)
+
+    def stop(self):
+        self._stop = True
+        
+    def _dowork(self):
+        pass
+
+
+
+class _Checkpoint(_WorkThread):
+    def __init__(self, storage, interval):
+        _WorkThread.__init__(self, storage, interval, 'checkpointing')
+
+    def _dowork(self, now):
+        self._storage.docheckpoint()