[Zodb-checkins] CVS: ZODB4/BDBStorage - BerkeleyBase.py:2.0

Barry Warsaw barry@wooz.org
Wed, 4 Dec 2002 14:47:17 -0500


Update of /cvs-repository/ZODB4/BDBStorage
In directory cvs.zope.org:/tmp/cvs-serv18074

Modified Files:
      Tag: 2.0
	BerkeleyBase.py 
Log Message:
Get rid of some pre-Python 2.2.2 b/c cruft.


=== Added File ZODB4/BDBStorage/BerkeleyBase.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

"""Base class for BerkeleyStorage implementations.
"""
__version__ = '$Revision: 2.0 $'.split()[-2:][0]

import os
import time
import errno
import threading
from types import StringType

# This uses the Dunn/Kuchling PyBSDDB v3 extension module available from
# http://pybsddb.sourceforge.net
from bsddb3 import db

# BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not.
from ZODB import POSException
from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.Serialize import findrefs
import ThreadLock
import zLOG

GBYTES = 1024 * 1024 * 1000

# Maximum number of seconds for background thread to sleep before checking to
# see if it's time for another autopack run.  Lower numbers mean more
# processing, higher numbers mean less responsiveness to shutdown requests.
# 10 seconds seems like a good compromise.  Note that if the check interval is
# less than the sleep time, the minimum will be used.
SLEEP_TIME = 10



class PackStop(Exception):
    """Escape hatch for pack operations."""



class BerkeleyConfig:
    """Bag of bits for describing various underlying configuration options.

    Berkeley databases are wildly configurable, and this class exposes some of
    that.  To customize these options, instantiate one of these classes and
    set the attributes below to the desired value.  Then pass this instance to
    the Berkeley storage constructor, using the `config' keyword argument.

    Berkeley storages need to be checkpointed occasionally, otherwise
    automatic recover can take a huge amount of time.  You should set up a
    checkpointing policy which trades off the amount of work done periodically
    against the recovery time.  Note that the Berkeley environment is
    automatically, and forcefully, checkpointed twice when it is closed.

    The following checkpointing attributes are supported:

    - interval indicates how often, in seconds, a Berkeley checkpoint is
      performed.  If this is non-zero, checkpointing is performed by a
      background thread.  Otherwise checkpointing will only be done when the
      storage is closed.   You really want to enable checkpointing. ;)

    - kbytes is passed directly to txn_checkpoint()

    - min is passed directly to txn_checkpoint()

    You can acheive one of the biggest performance wins by moving the Berkeley
    log files to a different disk than the data files.  We saw between 2.5 and
    7 x better performance this way.  Here are attributes which control the
    log files.

    - logdir if not None, is passed to the environment's set_lg_dir() method
      before it is opened.

    You can also improve performance by tweaking the Berkeley cache size.
    Berkeley's default cache size is 256KB which is usually too small.  Our
    default cache size is 128MB which seems like a useful tradeoff between
    resource consumption and improved performance.  You might be able to get
    slightly better results by turning up the cache size, although be mindful
    of your system's limits.  See here for more details:

        http://www.sleepycat.com/docs/ref/am_conf/cachesize.html

    These attributes control cache size settings:

    - cachesize should be the size of the cache in bytes.

    These attributes control the autopacking thread:

    - frequency is the time in seconds after which an autopack phase will be
      performed.  E.g. if frequency is 3600, an autopack will be done once per
      hour.  Set frequency to 0 to disable autopacking (the default).

    - packtime is the time in seconds marking the moment in the past at which
      to autopack to.  E.g. if packtime is 14400, autopack will pack to 4
      hours in the past.  For Minimal storage, this value is ignored.

    - classicpack is an integer indicating how often an autopack phase should
      do a full classic pack.  E.g. if classicpack is 24 and frequence is
      3600, a classic pack will be performed once per day.  Set to zero to
      never automatically do classic packs.  For Minimal storage, this value
      is ignored -- all packs are classic packs.
    """
    interval = 120
    kbyte = 0
    min = 0
    logdir = None
    cachesize = 128 * 1024 * 1024
    frequency = 0
    packtime = 4 * 60 * 60
    classicpack = 0



class BerkeleyBase(BaseStorage):
    """Base storage for Minimal and Full Berkeley implementations."""

    def __init__(self, name, env=None, prefix='zodb_', config=None):
        """Create a new storage.

        name is an arbitrary name for this storage.  It is returned by the
        getName() method.

        Optional env, if given, is either a string or a DBEnv object.  If it
        is a non-empty string, it names the database environment,
        i.e. essentially the name of a directory into which BerkeleyDB will
        store all its supporting files.  It is passed directly to
        DbEnv().open(), which in turn is passed to the BerkeleyDB function
        DBEnv->open() as the db_home parameter.

        Note that if you want to customize the underlying Berkeley DB
        parameters, this directory can contain a DB_CONFIG file as per the
        Sleepycat documentation.

        If env is given and it is not a string, it must be an opened DBEnv
        object as returned by bsddb3.db.DBEnv().  In this case, it is your
        responsibility to create the object and open it with the proper
        flags.

        Optional prefix is the string to prepend to name when passed to
        DB.open() as the dbname parameter.  IOW, prefix+name is passed to the
        BerkeleyDb function DB->open() as the database parameter.  It defaults
        to "zodb_".

        Optional config must be a BerkeleyConfig instance, or None, which
        means to use the default configuration options.
        """
        # sanity check arguments
        if config is None:
            config = BerkeleyConfig()
        self._config = config

        if name == '':
            raise TypeError, 'database name is empty'

        if env is None:
            env = name

        if env == '':
            raise TypeError, 'environment name is empty'
        elif isinstance(env, StringType):
            self._env, self._lockfile = env_from_string(env, self._config)
        else:
            self._env = env

        # Use the absolute path to the environment directory as the name.
        # This should be enough of a guarantee that sortKey() -- which via
        # BaseStorage uses the name -- is globally unique.
        envdir = os.path.abspath(self._env.db_home)
        BaseStorage.__init__(self, envdir)

        # Instantiate a pack lock
        self._packlock = ThreadLock.allocate_lock()
        self._autopacker = None
        self._stop = self._closed = False
        # Initialize a few other things
        self._prefix = prefix
        # Give the subclasses a chance to interpose into the database setup
        # procedure
        self._tables = []
        self._setupDBs()
        # Initialize the object id counter.
        self._init_oid()
        if config.interval > 0:
            self._checkpointer = _Checkpoint(self, config.interval)
            self._checkpointer.start()
        else:
            self._checkpointer = None

    def _setupDB(self, name, flags=0, dbtype=db.DB_BTREE, reclen=None):
        """Open an individual database with the given flags.

        flags are passed directly to the underlying DB.set_flags() call.
        Optional dbtype specifies the type of BerkeleyDB access method to
        use.  Optional reclen if not None gives the record length.
        """
        d = db.DB(self._env)
        if flags:
            d.set_flags(flags)
        # Our storage is based on the underlying BSDDB btree database type.
        if reclen is not None:
            d.set_re_len(reclen)
        openflags = db.DB_CREATE
        # DB 4.1.24 requires that operations happening in a transaction must
        # be performed on a database that was opened in a transaction.  Since
        # we do the former, we must do the latter.  However, earlier DB
        # versions don't transactionally protect database open, so this is the
        # most portable way to write the code.
        try:
            openflags |= db.DB_AUTO_COMMIT
        except AttributeError:
            pass
        d.open(self._prefix + name, dbtype, openflags)
        self._tables.append(d)
        return d

    def _setupDBs(self):
        """Set up the storages databases, typically using '_setupDB'.

        This must be implemented in a subclass.
        """
        raise NotImplementedError, '_setupDbs()'

    def _init_oid(self):
        """Initialize the object id counter."""
        # If the `serials' database is non-empty, the last object id in the
        # database will be returned (as a [key, value] pair).  Use it to
        # initialize the object id counter.
        #
        # If the database is empty, just initialize it to zero.
        value = self._serials.cursor().last()
        if value:
            self._oid = value[0]
        else:
            self._oid = '\0\0\0\0\0\0\0\0'

    # It can be very expensive to calculate the "length" of the database, so
    # we cache the length and adjust it as we add and remove objects.
    _len = None

    def __len__(self):
        """Return the number of objects in the index."""
        if self._len is None:
            # The cache has never been initialized.  Do it once the expensive
            # way.
            self._len = len(self._serials)
        return self._len

    def new_oid(self, last=None):
        """Create a new object id.

        If last is provided, the new oid will be one greater than that.
        """
        # BAW: the last parameter is undocumented in the UML model
        if self._len is not None:
            # Increment the cached length
            self._len += 1
        return BaseStorage.new_oid(self, last)

    def getSize(self):
        """Return the size of the database."""
        # Return the size of the pickles table as a rough estimate
        filename = os.path.join(self._env.db_home, 'zodb_pickles')
        return os.path.getsize(filename)

    def _vote(self):
        pass

    def _finish(self, tid, user, desc, ext):
        """Called from BaseStorage.tpc_finish(), this commits the underlying
        BSDDB transaction.

        tid is the transaction id
        user is the transaction user
        desc is the transaction description
        ext is the transaction extension

        These are all ignored.
        """
        self._transaction.commit()

    def _abort(self):
        """Called from BaseStorage.tpc_abort(), this aborts the underlying
        BSDDB transaction.
        """
        self._transaction.abort()

    def _clear_temp(self):
        """Called from BaseStorage.tpc_abort(), BaseStorage.tpc_begin(),
        BaseStorage.tpc_finish(), this clears out the temporary log file
        """
        # BAW: no-op this since the right CommitLog file operations are
        # performed by the methods in the derived storage class.
        pass

    def log(self, msg, *args):
        zLOG.LOG(self.__class__.__name__, zLOG.INFO, msg % args)

    def close(self):
        """Close the storage.

        All background threads are stopped and joined first, then all the
        tables are closed, and finally the environment is force checkpointed
        and closed too.
        """
        # Set this flag before acquiring the lock so we don't block waiting
        # for the autopack thread to give up the lock.
        self._stop = True
        self._lock_acquire()
        try:
            if not self._closed:
                self._doclose()
                self._closed = True
        finally:
            self._lock_release()

    def _doclose(self):
        # Stop the autopacker thread
        if self._autopacker:
            self.log('stopping autopacking thread')
            self._autopacker.stop()
            self._autopacker.join(SLEEP_TIME * 2)
        if self._checkpointer:
            self.log('stopping checkpointing thread')
            self._checkpointer.stop()
            self._checkpointer.join(SLEEP_TIME * 2)
        # Close all the tables
        for d in self._tables:
            d.close()
        # As recommended by Keith Bostic @ Sleepycat, we need to do
        # two checkpoints just before we close the environment.
        # Otherwise, auto-recovery on environment opens can be
        # extremely costly.  We want to do auto-recovery for ease of
        # use, although they aren't strictly necessary if the database
        # was shutdown gracefully.  The DB_FORCE flag is required for
        # the second checkpoint, but we include it in both because it
        # can't hurt and is more robust.
        self._env.txn_checkpoint(0, 0, db.DB_FORCE)
        self._env.txn_checkpoint(0, 0, db.DB_FORCE)
        lockfile = os.path.join(self._env.db_home, '.lock')
        self._lockfile.close()
        self._env.close()
        os.unlink(lockfile)

    def _update(self, deltas, data, incdec):
        for oid in findrefs(data):
            rc = deltas.get(oid, 0) + incdec
            if rc == 0:
                # Save space in the dict by zapping zeroes
                del deltas[oid]
            else:
                deltas[oid] = rc

    def _withlock(self, meth, *args):
        self._lock_acquire()
        try:
            return meth(*args)
        finally:
            self._lock_release()

    def _withtxn(self, meth, *args, **kws):
        txn = self._env.txn_begin()
        try:
            ret = meth(txn, *args, **kws)
        except PackStop:
            # Escape hatch for shutdown during pack.  Like the bare except --
            # i.e. abort the transaction -- but swallow the exception.
            txn.abort()
        except:
            #import traceback ; traceback.print_exc()
            txn.abort()
            raise
        else:
            txn.commit()
            return ret

    def docheckpoint(self):
        config = self._config
        self._lock_acquire()
        try:
            if not self._stop:
                self._env.txn_checkpoint(config.kbyte, config.min)
        finally:
            self._lock_release()



def env_from_string(envname, config):
    # BSDDB requires that the directory already exists.  BAW: do we need to
    # adjust umask to ensure filesystem permissions?
    try:
        os.mkdir(envname)
    except OSError, e:
        if e.errno <> errno.EEXIST: raise
        # already exists
    # Create the lock file so no other process can open the environment.
    # This is required in order to work around the Berkeley lock
    # exhaustion problem (i.e. we do our own application level locks
    # rather than rely on Berkeley's finite page locks).
    lockpath = os.path.join(envname, '.lock')
    try:
        lockfile = open(lockpath, 'r+')
    except IOError, e:
        if e.errno <> errno.ENOENT: raise
        lockfile = open(lockpath, 'w+')
    lock_file(lockfile)
    lockfile.write(str(os.getpid()))
    lockfile.flush()
    # Create, initialize, and open the environment
    env = db.DBEnv()
    if config.logdir is not None:
        env.set_lg_dir(config.logdir)
    gbytes, bytes = divmod(config.cachesize, GBYTES)
    env.set_cachesize(gbytes, bytes)
    env.open(envname,
             db.DB_CREATE          # create underlying files as necessary
             | db.DB_RECOVER       # run normal recovery before opening
             | db.DB_INIT_MPOOL    # initialize shared memory buffer pool
             | db.DB_INIT_TXN      # initialize transaction subsystem
             | db.DB_THREAD        # we use the environment from other threads
             )
    return env, lockfile



class _WorkThread(threading.Thread):
    def __init__(self, storage, checkinterval, name='work'):
        threading.Thread.__init__(self)
        self._storage = storage
        self._interval = checkinterval
        self._name = name
        # Bookkeeping
        self._stop = False
        self._nextcheck = checkinterval
        # We don't want these threads to hold up process exit.  That could
        # lead to corrupt databases, but recovery should ultimately save us.
        self.setDaemon(True)

    def run(self):
        name = self._name
        self._storage.log('%s thread started', name)
        while not self._stop:
            now = time.time()
            if now > self._nextcheck:
                self._storage.log('running %s', name)
                self._dowork(now)
                self._nextcheck = now + self._interval
            # Now we sleep for a little while before we check again.  Sleep
            # for the minimum of self._interval and SLEEP_TIME so as to be as
            # responsive as possible to .stop() calls.
            time.sleep(min(self._interval, SLEEP_TIME))
        self._storage.log('%s thread finished', name)

    def stop(self):
        self._stop = True

    def _dowork(self, now):
        pass



class _Checkpoint(_WorkThread):
    def __init__(self, storage, interval):
        _WorkThread.__init__(self, storage, interval, 'checkpointing')

    def _dowork(self, now):
        self._storage.docheckpoint()