[Zodb-checkins] CVS: ZODB4/BDBStorage - BDBFullStorage.py:2.2 BDBMinimalStorage.py:2.1 BerkeleyBase.py:2.2

Barry Warsaw barry@wooz.org
Fri, 6 Dec 2002 15:41:30 -0500


Update of /cvs-repository/ZODB4/BDBStorage
In directory cvs.zope.org:/tmp/cvs-serv28835/BDBStorage

Modified Files:
	BDBFullStorage.py BDBMinimalStorage.py BerkeleyBase.py 
Log Message:
Port changes from ZODB3

Simplify again the WorkThread api.  Don't use a non-portable poll
object -- because we just care about one event we can simply use a
threading.Event object.  Specific changes include,

BerkeleyBase.py

    __init__(): Simplify the creation of the checkpointer and
    autopacker worker threads.

    close(): Replace autopacker.stop() with setting the Event object.
    This both kicks us out of the wait() and sets the thread's
    internal stop flag, so it's all we need.

    _WorkThread.init(): Take the `name' argument out of the
    constructor.  It was the only thing that 2/3rds of the subclasses
    needed to override, so just stick it in a class attribute.

    run(): Simplify to use the Event object.  Also, change _nextcheck
    to recalculate `now' after the work is done.  There's no telling
    how much time the work will take (it may not matter much in
    practice).

    stop(): Removed.

    _Checkpoint.__init__(): Removed

BDBFullStorage.py

    _make_autopacker(): Updated

    _Autopack.__init__(): Updated

BDBMinimalStorage.py

    _make_autopacker(): Updated

    _Autopack.__init__(): Removed


=== ZODB4/BDBStorage/BDBFullStorage.py 2.1 => 2.2 ===
--- ZODB4/BDBStorage/BDBFullStorage.py:2.1	Wed Dec  4 14:54:49 2002
+++ ZODB4/BDBStorage/BDBFullStorage.py	Fri Dec  6 15:41:30 2002
@@ -17,9 +17,7 @@
 
 __version__ = '$Revision$'.split()[-2:][0]
 
-import sys
 import time
-import threading
 import cPickle as pickle
 from struct import pack, unpack
 
@@ -225,15 +223,14 @@
         self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
         # Do recovery and consistency checks
         self._withlock(self._dorecovery)
-        # Set up the autopacking thread
+
+    def _make_autopacker(self, event):
         config = self._config
-        if config.frequency > 0:
-            lastpacktime = u64(self._last_packtime())
-            self._autopacker = _Autopack(
-                self, config.frequency,
-                config.packtime, config.classicpack,
-                lastpacktime)
-            self._autopacker.start()
+        lastpacktime = u64(self._last_packtime())
+        return _Autopack(
+            self, event,
+            config.frequency, config.packtime, config.classicpack,
+            lastpacktime)
 
     def _dorecovery(self):
         # If these tables are non-empty, it means we crashed during a pack
@@ -1798,7 +1795,10 @@
             self.status = ' '
         self.user = user
         self.description = desc
-        self._extension = ext
+        try:
+            self._extension = pickle.loads(ext)
+        except EOFError:
+            self._extension = {}
         # Internal pointer
         self._oids = self._storage._alltxnoids(self.tid)
         # To make .pop() more efficient
@@ -1840,16 +1840,18 @@
 
 
 class _Autopack(_WorkThread):
-    def __init__(self, storage, frequency, packtime, classicpack,
+    NAME = 'autopacking'
+
+    def __init__(self, storage, event,
+                 frequency, packtime, classicpack,
                  lastpacktime):
-        _WorkThread.__init__(self, storage, frequency, 'autopacking')
+        _WorkThread.__init__(self, storage, event, frequency)
         self._packtime = packtime
         self._classicpack = classicpack
         # Bookkeeping
-        self._stop = False
         self._lastclassic = 0
 
-    def _dowork(self, now):
+    def _dowork(self):
         # Should we do a classic pack this time?
         if self._classicpack <= 0:
             classicp = False
@@ -1858,4 +1860,4 @@
             self._lastclassic = v
             classicp = not v
         # Run the autopack phase
-        self._storage.autopack(now - self._packtime, classicp)
+        self._storage.autopack(time.time() - self._packtime, classicp)


=== ZODB4/BDBStorage/BDBMinimalStorage.py 2.0 => 2.1 ===
--- ZODB4/BDBStorage/BDBMinimalStorage.py:2.0	Wed Dec  4 14:42:20 2002
+++ ZODB4/BDBStorage/BDBMinimalStorage.py	Fri Dec  6 15:41:30 2002
@@ -17,16 +17,13 @@
 
 __version__ = '$Revision$'[-2:][0]
 
-import time
-import threading
-
 # This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
 # http://pybsddb.sourceforge.net.  It is compatible with release 3.4 of
 # PyBSDDB3.
 from bsddb3 import db
 
 from ZODB import POSException
-from ZODB.utils import u64, p64
+from ZODB.utils import p64, u64
 from ZODB.Serialize import findrefs
 from ZODB.ConflictResolution import ConflictResolvingStorage, ResolvedSerial
 
@@ -40,12 +37,6 @@
 PRESENT = 'X'
 ZERO = '\0'*8
 
-# Number of seconds for the autopack thread to sleep before checking to see if
-# it's time for another autopack run.  Lower numbers mean more processing,
-# higher numbers mean less responsiveness to shutdown requests.  10 seconds
-# seems like a good compromise.
-AUTOPACK_CHECK_SLEEP = 10
-
 
 
 class BDBMinimalStorage(BerkeleyBase, ConflictResolvingStorage):
@@ -125,11 +116,9 @@
                     self._withtxn(self._docommit, tid)
             finally:
                 self._lock_release()
-        # Set up the autopacking thread
-        if self._config.frequency > 0:
-            config = self._config
-            self._autopacker = _Autopack(self, config.frequency)
-            self._autopacker.start()
+
+    def _make_autopacker(self, event):
+        return _Autopack(self, event, self._config.frequency)
 
     def _doabort(self, txn, tid):
         co = cs = None
@@ -532,9 +521,8 @@
 
 
 class _Autopack(_WorkThread):
-    def __init__(self, storage, frequency):
-        _WorkThread.__init__(self, storage, frequency, 'autopacking')
+    NAME = 'autopacking'
 
-    def _dowork(self, now):
+    def _dowork(self):
         # Run the autopack phase
         self._storage.pack('ignored')


=== ZODB4/BDBStorage/BerkeleyBase.py 2.1 => 2.2 ===
--- ZODB4/BDBStorage/BerkeleyBase.py:2.1	Wed Dec  4 17:40:26 2002
+++ ZODB4/BDBStorage/BerkeleyBase.py	Fri Dec  6 15:41:30 2002
@@ -28,7 +28,6 @@
 
 # BaseStorage provides primitives for lock acquisition and release, and a host
 # of other methods, some of which are overridden here, some of which are not.
-from ZODB import POSException
 from ZODB.lock_file import lock_file
 from ZODB.BaseStorage import BaseStorage
 from ZODB.Serialize import findrefs
@@ -36,12 +35,11 @@
 
 GBYTES = 1024 * 1024 * 1000
 
-# Maximum number of seconds for background thread to sleep before checking to
-# see if it's time for another autopack run.  Lower numbers mean more
-# processing, higher numbers mean less responsiveness to shutdown requests.
-# 10 seconds seems like a good compromise.  Note that if the check interval is
-# less than the sleep time, the minimum will be used.
-SLEEP_TIME = 10
+# How long should we wait to join one of the background daemon threads?  It's
+# a good idea to not set this too short, or we could corrupt our database.
+# That would be recoverable, but recovery could take a long time too, so it's
+# better to shutdown cleanly.
+JOIN_TIME = 10
 
 
 
@@ -182,7 +180,6 @@
 
         # Instantiate a pack lock
         self._packlock = threading.Lock()
-        self._autopacker = None
         self._stop = self._closed = False
         # Initialize a few other things
         self._prefix = prefix
@@ -192,11 +189,23 @@
         self._setupDBs()
         # Initialize the object id counter.
         self._init_oid()
+        # Set up the checkpointing thread
         if config.interval > 0:
-            self._checkpointer = _Checkpoint(self, config.interval)
+            self._checkpointstop = event = threading.Event()
+            self._checkpointer = _Checkpoint(self, event, config.interval)
             self._checkpointer.start()
         else:
             self._checkpointer = None
+        # Set up the autopacking thread
+        if config.frequency > 0:
+            self._autopackstop = event = threading.Event()
+            self._autopacker = self._make_autopacker(event)
+            self._autopacker.start()
+        else:
+            self._autopacker = None
+
+    def _make_autopacker(self, event):
+        raise NotImplementedError
 
     def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
         """Open an individual database with the given flags.
@@ -314,9 +323,21 @@
         tables are closed, and finally the environment is force checkpointed
         and closed too.
         """
-        # Set this flag before acquiring the lock so we don't block waiting
-        # for the autopack thread to give up the lock.
+        # We have to shutdown the background threads before we acquire the
+        # lock, or we'll could end up closing the environment before the
+        # autopacking thread exits.
         self._stop = True
+        # Stop the autopacker thread
+        if self._autopacker:
+            self.log('stopping autopacking thread')
+            # Setting the event also toggles the stop flag
+            self._autopackstop.set()
+            self._autopacker.join(JOIN_TIME)
+        if self._checkpointer:
+            self.log('stopping checkpointing thread')
+            # Setting the event also toggles the stop flag
+            self._checkpointstop.set()
+            self._checkpointer.join(JOIN_TIME)
         self._lock_acquire()
         try:
             if not self._closed:
@@ -326,15 +347,6 @@
             self._lock_release()
 
     def _doclose(self):
-        # Stop the autopacker thread
-        if self._autopacker:
-            self.log('stopping autopacking thread')
-            self._autopacker.stop()
-            self._autopacker.join(SLEEP_TIME * 2)
-        if self._checkpointer:
-            self.log('stopping checkpointing thread')
-            self._checkpointer.stop()
-            self._checkpointer.join(SLEEP_TIME * 2)
         # Close all the tables
         for d in self._tables:
             d.close()
@@ -417,30 +429,37 @@
     lock_file(lockfile)
     lockfile.write(str(os.getpid()))
     lockfile.flush()
-    # Create, initialize, and open the environment
-    env = db.DBEnv()
-    if config.logdir is not None:
-        env.set_lg_dir(config.logdir)
-    gbytes, bytes = divmod(config.cachesize, GBYTES)
-    env.set_cachesize(gbytes, bytes)
-    env.open(envname,
-             db.DB_CREATE          # create underlying files as necessary
-             | db.DB_RECOVER       # run normal recovery before opening
-             | db.DB_INIT_MPOOL    # initialize shared memory buffer pool
-             | db.DB_INIT_TXN      # initialize transaction subsystem
-             | db.DB_THREAD        # we use the environment from other threads
-             )
+    try:
+        # Create, initialize, and open the environment
+        env = db.DBEnv()
+        if config.logdir is not None:
+            env.set_lg_dir(config.logdir)
+        gbytes, bytes = divmod(config.cachesize, GBYTES)
+        env.set_cachesize(gbytes, bytes)
+        env.open(envname,
+                 db.DB_CREATE          # create underlying files as necessary
+                 | db.DB_RECOVER       # run normal recovery before opening
+                 | db.DB_INIT_MPOOL    # initialize shared memory buffer pool
+                 | db.DB_INIT_TXN      # initialize transaction subsystem
+                 | db.DB_THREAD        # we use the env from multiple threads
+                 )
+    except:
+        lockfile.close()
+        raise
     return env, lockfile
 
 
 
 class _WorkThread(threading.Thread):
-    def __init__(self, storage, checkinterval, name='work'):
+    NAME = 'worker'
+
+    def __init__(self, storage, event, checkinterval):
         threading.Thread.__init__(self)
         self._storage = storage
+        self._event = event
         self._interval = checkinterval
-        self._name = name
-        # Bookkeeping
+        # Bookkeeping.  _nextcheck is useful as a non-public interface aiding
+        # testing.  See test_autopack.py.
         self._stop = False
         self._nextcheck = checkinterval
         # We don't want these threads to hold up process exit.  That could
@@ -448,31 +467,28 @@
         self.setDaemon(True)
 
     def run(self):
-        name = self._name
+        name = self.NAME
         self._storage.log('%s thread started', name)
         while not self._stop:
             now = time.time()
-            if now > self._nextcheck:
+            if now >= self._nextcheck:
                 self._storage.log('running %s', name)
-                self._dowork(now)
-                self._nextcheck = now + self._interval
-            # Now we sleep for a little while before we check again.  Sleep
-            # for the minimum of self._interval and SLEEP_TIME so as to be as
-            # responsive as possible to .stop() calls.
-            time.sleep(min(self._interval, SLEEP_TIME))
+                self._dowork()
+                # Recalculate `now' because _dowork() could have taken a
+                # while.  time.time() can be expensive, but oh well.
+                self._nextcheck = time.time() + self._interval
+            # Block w/ timeout on the shutdown event.
+            self._event.wait(self._interval)
+            self._stop = self._event.isSet()
         self._storage.log('%s thread finished', name)
 
-    def stop(self):
-        self._stop = True
-
-    def _dowork(self, now):
+    def _dowork(self):
         pass
 
 
 
 class _Checkpoint(_WorkThread):
-    def __init__(self, storage, interval):
-        _WorkThread.__init__(self, storage, interval, 'checkpointing')
+    NAME = 'checkpointing'
 
-    def _dowork(self, now):
+    def _dowork(self):
         self._storage.docheckpoint()