[Zope3-checkins] CVS: Zope3/src/zodb/zeo - __init__.py:1.1.2.1 cache.py:1.1.2.1 client.py:1.1.2.1 commitlog.py:1.1.2.1 exceptions.py:1.1.2.1 interfaces.py:1.1.2.1 runsvr.py:1.1.2.1 server.py:1.1.2.1 simul.py:1.1.2.1 stats.py:1.1.2.1 stubs.py:1.1.2.1 tbuf.py:1.1.2.1 utils.py:1.1.2.1

Jim Fulton jim@zope.com
Mon, 23 Dec 2002 14:30:54 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/zeo

Added Files:
      Tag: NameGeddon-branch
	__init__.py cache.py client.py commitlog.py exceptions.py 
	interfaces.py runsvr.py server.py simul.py stats.py stubs.py 
	tbuf.py utils.py 
Log Message:
Initial renaming before debugging

=== Added File Zope3/src/zodb/zeo/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""ZEO -- Zope Enterprise Objects.

See the file README.txt in this directory for an overview.

ZEO's home on the web is

    http://www.zope.org/Products/ZEO/

"""

version = "2.0+"


=== Added File Zope3/src/zodb/zeo/cache.py === (577/677 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################

# XXX TO DO
# use two indices rather than the sign bit of the index??????
# add a shared routine to read + verify a record???
# redesign header to include vdlen???
# rewrite the cache using a different algorithm???

"""Implement a client cache

The cache is managed as two files.

The cache can be persistent (meaning it is survives a process restart)
or temporary.  It is persistent if the client argument is not None.

Persistent cache files live in the var directory and are named
'c<storage>-<client>-<digit>.zec' where <storage> is the storage
argument (default '1'), <client> is the client argument, and <digit> is
0 or 1.  Temporary cache files are unnamed files in the standard
temporary directory as determined by the tempfile module.

The ClientStorage overrides the client name default to the value of
the environment variable ZEO_CLIENT, if it exists.

Each cache file has a 4-byte magic number followed by a sequence of
records of the form:

  offset in record: name -- description

  0: oid -- 8-byte object id

  8: status -- 1-byte status 'v': valid, 'n': non-version valid, 'i': invalid
               ('n' means only the non-version data in the record is valid)

  9: tlen -- 4-byte (unsigned) record length

  13: vlen -- 2-byte (unsigned) version length


[-=- -=- -=- 577 lines omitted -=- -=- -=-]

                if vlen+dlen+43+vdlen != tlen:
                    self.rilog("inconsistent lengths", pos, fileindex)
                    break
                seek(vdlen, 1)
                vs = read(8)
                if read(4) != h[9:13]:
                    self.rilog("inconsistent tlen", pos, fileindex)
                    break
            else:
                if h[8] in 'vn' and vlen == 0:
                    if dlen+31 != tlen:
                        self.rilog("inconsistent nv lengths", pos, fileindex)
                    seek(dlen, 1)
                    if read(4) != h[9:13]:
                        self.rilog("inconsistent nv tlen", pos, fileindex)
                        break
                vs = None

            if h[8] in 'vn':
                if fileindex:
                    index[oid] = -pos
                else:
                    index[oid] = pos
                serial[oid] = h[-8:], vs
            else:
                if serial.has_key(oid):
                    # We have a record for this oid, but it was invalidated!
                    del serial[oid]
                    del index[oid]


            pos = pos + tlen
            count += 1

        f.seek(pos)
        try:
            f.truncate()
        except:
            pass

        if count:
            self.log("read_index: cache file %d has %d records and %d bytes",
                     fileindex, count, pos)

        return pos

    def rilog(self, msg, pos, fileindex):
        # Helper to log certain messages from read_index
        self.log("read_index: %s at position %d in cache file %d",
                 msg, pos, fileindex)


=== Added File Zope3/src/zodb/zeo/client.py === (735/835 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""The ClientStorage class and the exceptions that it may raise.

Public contents of this module:

ClientStorage -- the main class, implementing the Storage API
ClientStorageError -- exception raised by ClientStorage
UnrecognizedResult -- exception raised by ClientStorage
ClientDisconnected -- exception raised by ClientStorage
"""

# XXX TO DO
# get rid of beginVerify, set up _tfile in verify_cache
# set self._storage = stub later, in endVerify
# if wait is given, wait until verify is complete

import cPickle
import os
import socket
import tempfile
import threading
import time
import types
import logging

from zodb.zeo import ClientCache, ServerStub
from zodb.zeo.tbuf import TransactionBuffer
from zodb.zeo.exceptions import Disconnected
from zodb.zeo.zrpc.client import ConnectionManager

from zodb import POSException
from zodb.timestamp import TimeStamp

try:
    from zodb.conflict import ResolvedSerial
except ImportError:
    ResolvedSerial = 'rs'


[-=- -=- -=- 735 lines omitted -=- -=- -=-]

        # Queue an invalidate for the end the verification procedure.
        if self._pickler is None:
            # XXX This should never happen
            return
        self._pickler.dump(args)

    def endVerify(self):
        """Server callback to signal end of cache validation."""
        if self._pickler is None:
            return
        self._pickler.dump((0,0))
        self._tfile.seek(0)
        unpick = cPickle.Unpickler(self._tfile)
        f = self._tfile
        self._tfile = None

        while 1:
            oid, version = unpick.load()
            if not oid:
                break
            self._cache.invalidate(oid, version=version)
            self._db.invalidate(oid, version=version)
        f.close()

    def invalidateTrans(self, args):
        """Server callback to invalidate a list of (oid, version) pairs.

        This is called as the result of a transaction.
        """
        for oid, version in args:
            self._cache.invalidate(oid, version=version)
            try:
                self._db.invalidate(oid, version=version)
            except AttributeError, msg:
                self.logger.error("Invalidate(%r, %r) failed for _db: %s",
                                  oid, version, msg)

    # Unfortunately, the ZEO 2 wire protocol uses different names for
    # several of the callback methods invoked by the StorageServer.
    # We can't change the wire protocol at this point because that
    # would require synchronized updates of clients and servers and we
    # don't want that.  So here we alias the old names to their new
    # implementations.

    begin = beginVerify
    invalidate = invalidateVerify
    end = endVerify
    Invalidate = invalidateTrans




=== Added File Zope3/src/zodb/zeo/commitlog.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Log a transaction's commit info during two-phase commit.

A storage server allows multiple clients to commit transactions, but
must serialize them as the actually execute at the server.  The
concurrent commits are achieved by logging actions up until the
tpc_vote().  At that point, the entire transaction is committed on the
real storage.
"""
import cPickle
import tempfile

class CommitLog:

    def __init__(self):
        self.file = tempfile.TemporaryFile(suffix=".log")
        self.pickler = cPickle.Pickler(self.file, 1)
        self.pickler.fast = 1
        self.stores = 0
        self.read = 0

    def store(self, oid, serial, data, version):
        self.pickler.dump((oid, serial, data, version))
        self.stores += 1

    def get_loader(self):
        self.read = 1
        self.file.seek(0)
        return self.stores, cPickle.Unpickler(self.file)


=== Added File Zope3/src/zodb/zeo/exceptions.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Exceptions for ZEO."""

class Disconnected(Exception):
    """Exception raised when a ZEO client is disconnected from the
    ZEO server."""


=== Added File Zope3/src/zodb/zeo/interfaces.py ===
from zope.interface import Interface

class ICache(Interface):
    """ZEO client cache.

    __init__(storage, size, client, var)

    All arguments optional.

    storage -- name of storage
    size -- max size of cache in bytes
    client -- a string; if specified, cache is persistent.
    var -- var directory to store cache files in
    """

    def open():
        """Returns a sequence of object info tuples.

        An object info tuple is a pair containing an object id and a
        pair of serialnos, a non-version serialno and a version serialno:
        oid, (serial, ver_serial)

        This method builds an index of the cache and returns a
        sequence used for cache validation.
        """

    def close():
        """Closes the cache."""

    def verify(func):
        """Call func on every object in cache.

        func is called with three arguments
        func(oid, serial, ver_serial)
        """

    def invalidate(oid, version):
        """Remove object from cache."""

    def load(oid, version):
        """Load object from cache.

        Return None if object not in cache.
        Return data, serialno if object is in cache.
        """

    def store(oid, p, s, version, pv, sv):
        """Store a new object in the cache."""

    def update(oid, serial, version, data):
        """Update an object already in the cache.

        XXX This method is called to update objects that were modified by
        a transaction.  It's likely that it is already in the cache,
        and it may be possible for the implementation to operate more
        efficiently.
        """

    def modifiedInVersion(oid):
        """Return the version an object is modified in.

        '' signifies the trunk.
        Returns None if the object is not in the cache.
        """

    def checkSize(size):
        """Check if adding size bytes would exceed cache limit.

        This method is often called just before store or update.  The
        size is a hint about the amount of data that is about to be
        stored.  The cache may want to evict some data to make space.
        """


=== Added File Zope3/src/zodb/zeo/runsvr.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Start the ZEO storage server.

Usage: %s [-C URL] [-a ADDRESS] [-f FILENAME] [-h]

Options:
-C/--configuration URL -- configuration file or URL
-a/--address ADDRESS -- server address of the form PORT, HOST:PORT, or PATH
                        (a PATH must contain at least one "/")
-f/--filename FILENAME -- filename for FileStorage
-h/--help -- print this usage message and exit

Unless -C is specified, -a and -f are required.
"""

# The code here is designed to be reused by other, similar servers.
# For the forseeable future, it must work under Python 2.1 as well as
# 2.2 and above.

# XXX The option parsing infrastructure could be shared with zdaemon.py

import os
import sys
import getopt
import signal
import socket
import logging

import ZConfig


class Options:

    """A class to parse and hold the command line options.

    Options are represented by various attributes (zeoport etc.).
    Positional arguments are represented by the args attribute.

    This also has a public usage() method that can be used to report
    errors related to the command line.
    """

    configuration = None
    rootconf = None

    args = []

    def __init__(self, args=None, progname=None, doc=None):
        """Constructor.

        Optional arguments:

        args     -- the command line arguments, less the program name
                    (default is sys.argv[1:] at the time of call)

        progname -- the program name (default sys.argv[0])

        doc      -- usage message (default, __main__.__doc__)
        """

        if args is None:
            args = sys.argv[1:]
        if progname is None:
            progname = sys.argv[0]
        self.progname = progname
        if doc is None:
            import __main__
            doc = __main__.__doc__
        if doc and not doc.endswith("\n"):
            doc += "\n"
        self.doc = doc
        try:
            self.options, self.args = getopt.getopt(args,
                                                    self._short_options,
                                                    self._long_options)
        except getopt.error, msg:
            self.usage(str(msg))
        for opt, arg in self.options:
            self.handle_option(opt, arg)
        self.check_options()

    # Default set of options.  Subclasses should override.
    _short_options = "C:h"
    _long_options = ["--configuration=", "--help"]

    def handle_option(self, opt, arg):
        """Handle one option.  Subclasses should override.

        This sets the various instance variables overriding the defaults.

        When -h is detected, print the module docstring to stdout and exit(0).
        """
        if opt in ("-C", "--configuration"):
            self.set_configuration(arg)
        if opt in ("-h", "--help"):
            self.help()

    def set_configuration(self, arg):
        self.configuration = arg

    def check_options(self):
        """Check options.  Subclasses may override.

        This can be used to ensure certain options are set, etc.
        """
        self.load_configuration()

    def load_configuration(self):
        if self.rootconf or not self.configuration:
            return
        self.rootconf = ZConfig.load(self.configuration)

    def help(self):
        """Print a long help message (self.doc) to stdout and exit(0).

        Occurrences of "%s" in self.doc are replaced by self.progname.
        """
        doc = self.doc
        if doc.find("%s") > 0:
            doc = doc.replace("%s", self.progname)
        print doc
        sys.exit(0)

    def usage(self, msg):
        """Print a brief error message to stderr and exit(2)."""
        sys.stderr.write("Error: %s\n" % str(msg))
        sys.stderr.write("For help, use %s -h\n" % self.progname)
        sys.exit(2)


class ZEOOptions(Options):

    hostname = None                     # A subclass may set this
    hostconf = None                     # <Host> section
    zeoconf = None                      # <ZEO> section
    logconf = None                      # <Log> section

    family = None                       # set by -a; AF_UNIX or AF_INET
    address = None                      # set by -a; string or (host, port)
    storages = None                     # set by -f

    _short_options = "a:C:f:h"
    _long_options = [
        "--address=",
        "--configuration=",
        "--filename=",
        "--help",
        ]

    def handle_option(self, opt, arg):
        # Alphabetical order please!
        if opt in ("-a", "--address"):
            if "/" in arg:
                self.family = socket.AF_UNIX
                self.address = arg
            else:
                self.family = socket.AF_INET
                if ":" in arg:
                    host, port = arg.split(":", 1)
                else:
                    host = ""
                    port = arg
                try:
                    port = int(port)
                except: # int() can raise all sorts of errors
                    self.usage("invalid port number: %r" % port)
                self.address = (host, port)
        elif opt in ("-f", "--filename"):
            from zodb.storage.file import FileStorage
            if not self.storages:
                self.storages = {}
            key = str(1 + len(self.storages))
            self.storages[key] = (FileStorage, {"file_name": arg})
        else:
            # Pass it to the base class, for --help/-h
            Options.handle_option(self, opt, arg)

    def check_options(self):
        Options.check_options(self) # Calls load_configuration()
        if not self.storages:
            self.usage("no storages specified; use -f or -C")
        if self.family is None:
            self.usage("no server address specified; use -a or -C")
        if self.args:
            self.usage("positional arguments are not supported")

    def load_configuration(self):
        Options.load_configuration(self) # Sets self.rootconf
        if not self.rootconf:
            return
        try:
            self.hostconf = self.rootconf.getSection("Host")
        except ZConfig.ConfigurationConflictingSectionError:
            if not self.hostname:
                self.hostname = socket.getfqdn()
            self.hostconf = self.rootconf.getSection("Host", self.hostname)
        if self.hostconf is None:
            # If no <Host> section exists, fall back to the root
            self.hostconf = self.rootconf
        self.zeoconf = self.hostconf.getSection("ZEO")
        if self.zeoconf is None:
            # If no <ZEO> section exists, fall back to the host (or root)
            self.zeoconf = self.hostconf
        self.logconf = self.hostconf.getSection("Log")

        # Now extract options from various configuration sections
        self.load_zeoconf()
        self.load_storages()

    def load_zeoconf(self):
        # Get some option defaults from the configuration
        if self.family:
            # -a option overrides
            return
        port = self.zeoconf.getint("server-port")
        path = self.zeoconf.get("path")
        if port and path:
            self.usage(
                "Configuration contains conflicting ZEO information:\n"
                "Exactly one of 'path' and 'server-port' may be given.")
        if port:
            host = self.hostconf.get("hostname", "")
            self.family = socket.AF_INET
            self.address = (host, port)
        elif path:
            self.family = socket.AF_UNIX
            self.address = path

    def load_storages(self):
        # Get the storage specifications
        if self.storages:
            # -f option overrides
            return
        storagesections = self.zeoconf.getChildSections("Storage")
        self.storages = {}
        from zodb.config import getStorageInfo
        for section in storagesections:
            name = section.name
            if not name:
                name = str(1 + len(self.storages))
            if self.storages.has_key(name):
                # (Actually, the parser doesn't allow this)
                self.usage("duplicate storage name %r" % name)
            self.storages[name] = getStorageInfo(section)


class ZEOServer:

    OptionsClass = ZEOOptions

    def __init__(self, options=None):
        if options is None:
            options = self.OptionsClass()
        self.options = options

    def main(self):
        self.check_socket()
        self.clear_socket()
        try:
            self.open_storages()
            self.setup_signals()
            self.create_server()
            self.loop_forever()
        finally:
            self.close_storages()
            self.clear_socket()

    def check_socket(self):
        if self.can_connect(self.options.family, self.options.address):
            self.options.usage("address %s already in use" %
                               repr(self.options.address))

    def can_connect(self, family, address):
        s = socket.socket(family, socket.SOCK_STREAM)
        try:
            s.connect(address)
        except socket.error:
            return 0
        else:
            s.close()
            return 1

    def clear_socket(self):
        if isinstance(self.options.address, type("")):
            try:
                os.unlink(self.options.address)
            except os.error:
                pass

    def open_storages(self):
        self.storages = {}
        for name, (cls, args) in self.options.storages.items():
            logging.info("open storage %r: %s.%s(**%r)",
                         name, cls.__module__, cls.__name__, args)
            self.storages[name] = cls(**args)

    def setup_signals(self):
        """Set up signal handlers.

        The signal handler for SIGFOO is a method handle_sigfoo().
        If no handler method is defined for a signal, the signal
        action is not changed from its initial value.  The handler
        method is called without additional arguments.
        """
        if os.name != "posix":
            return
        if hasattr(signal, 'SIGXFSZ'):
            signal.signal(signal.SIGXFSZ, signal.SIG_IGN) # Special case
        init_signames()
        for sig, name in signames.items():
            method = getattr(self, "handle_" + name.lower(), None)
            if method is not None:
                def wrapper(sig_dummy, frame_dummy, method=method):
                    method()
                signal.signal(sig, wrapper)

    def create_server(self):
        from zodb.zeo.server import StorageServer
        self.server = StorageServer(self.options.address, self.storages)

    def loop_forever(self):
        import ThreadedAsync
        ThreadedAsync.loop()

    def handle_sigterm(self):
        logging.info("terminated by SIGTERM")
        sys.exit(0)

    def handle_sigint(self):
        logging.info("terminated by SIGINT")
        sys.exit(0)

    def handle_sigusr2(self):
        # XXX What to do here?
        logging.error("Don't know how to reinitialize log files yet")

    def close_storages(self):
        for name, storage in self.storages.items():
            logging.info("closing storage %r", name)
            try:
                storage.close()
            except: # Keep going
                logging.exception("failed to close storage %r", name)


# Signal names

signames = None

def signame(sig):
    """Return a symbolic name for a signal.

    Return "signal NNN" if there is no corresponding SIG name in the
    signal module.
    """

    if signames is None:
        init_signames()
    return signames.get(sig) or "signal %d" % sig

def init_signames():
    global signames
    signames = {}
    for name, sig in signal.__dict__.items():
        k_startswith = getattr(name, "startswith", None)
        if k_startswith is None:
            continue
        if k_startswith("SIG") and not k_startswith("SIG_"):
            signames[sig] = name


# Main program

def main(args=None):

    # Initialize the logging module.
    # XXX This is a temporary hack.
    import logging.config
    logging.basicConfig()
    logging.root.setLevel(logging.CRITICAL)
    # If log.ini exists, use it
    if os.path.exists("log.ini"):
        logging.config.fileConfig("log.ini")

    options = ZEOOptions(args)
    s = ZEOServer(options)
    s.main()

if __name__ == "__main__":
    main()


=== Added File Zope3/src/zodb/zeo/server.py === (687/787 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""The StorageServer class and the exception that it may raise.

This server acts as a front-end for one or more real storages, like
file storage or Berkeley storage.

XXX Need some basic access control-- a declaration of the methods
exported for invocation by the server.
"""

import asyncore
import cPickle
import os
import sys
import threading
import time
import logging

from zodb.zeo import ClientStub
from zodb.zeo.commitlog import CommitLog
from zodb.zeo.zrpc.server import Dispatcher
from zodb.zeo.zrpc.connection import ManagedServerConnection, Delay, MTDelay

from transaction.txn import Transaction
from zodb.interfaces import StorageError, StorageTransactionError
from zodb.interfaces import TransactionError, ReadOnlyError

class StorageServerError(StorageError):
    """Error reported when an unpickleable exception is raised."""

class StorageServer:

    """The server side implementation of ZEO.

    The StorageServer is the 'manager' for incoming connections.  Each
    connection is associated with its own ZEOStorage instance (defined
    below).  The StorageServer may handle multiple storages; each
    ZEOStorage instance only handles a single storage.

[-=- -=- -=- 687 lines omitted -=- -=- -=-]

        meth = getattr(new_strategy, self.name)
        return meth(*self.args)

    def abort(self, zeo_storage):
        # Delete (d, zeo_storage) from the _waiting list, if found.
        waiting = self.storage._waiting
        for i in range(len(waiting)):
            d, z = waiting[i]
            if z is zeo_storage:
                del waiting[i]
                break

def run_in_thread(method, *args):
    t = SlowMethodThread(method, args)
    t.start()
    return t.delay

class SlowMethodThread(threading.Thread):
    """Thread to run potentially slow storage methods.

    Clients can use the delay attribute to access the MTDelay object
    used to send a zrpc response at the right time.
    """

    # Some storage methods can take a long time to complete.  If we
    # run these methods via a standard asyncore read handler, they
    # will block all other server activity until they complete.  To
    # avoid blocking, we spawn a separate thread, return an MTDelay()
    # object, and have the thread reply() when it finishes.

    def __init__(self, method, args):
        threading.Thread.__init__(self)
        self._method = method
        self._args = args
        self.delay = MTDelay()

    def run(self):
        try:
            result = self._method(*self._args)
        except (SystemExit, KeyboardInterrupt):
            raise
        except Exception:
            self.delay.error(sys.exc_info())
        else:
            self.delay.reply(result)

# Patch up class references
StorageServer.ZEOStorageClass = ZEOStorage
ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
ZEOStorage.ImmediateCommitStrategyClass = ImmediateCommitStrategy


=== Added File Zope3/src/zodb/zeo/simul.py === (664/764 lines abridged)
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Cache simulation.

Usage: simul.py [-bflyz] [-X] [-s size] tracefile

Use one of -b, -f, -l, -y or -z select the cache simulator:
-b: buddy system allocator
-f: simple free list allocator
-l: idealized LRU (no allocator)
-y: variation on the existing ZEO cache that copies to current file
-z: existing ZEO cache (default)

Options:
-s size: cache size in MB (default 20 MB)
-X: enable heuristic checking for misaligned records: oids > 2**32
    will be rejected; this requires the tracefile to be seekable

Note: the buddy system allocator rounds the cache size up to a power of 2
"""

import sys
import time
import getopt
import struct

def usage(msg):
    print >>sys.stderr, msg
    print >>sys.stderr, __doc__

def main():
    # Parse options
    MB = 1000*1000
    cachelimit = 20*MB
    simclass = ZEOCacheSimulation
    heuristic = 0
    try:
        opts, args = getopt.getopt(sys.argv[1:], "bflyzs:X")

[-=- -=- -=- 664 lines omitted -=- -=- -=-]

    queue = []
    T = 0
    blocks = 0
    while T < 5000:
        while queue and queue[0][0] <= T:
            time, node = heapq.heappop(queue)
            assert time == T
            ##print "free addr=%d, size=%d" % (node.addr, node.size)
            cache.free(node)
            blocks -= 1
        size = random.randint(100, 2000)
        lifetime = random.randint(1, 100)
        node = cache.alloc(size)
        if node is None:
            print "out of mem"
            cache.dump("T=%4d: %d blocks;" % (T, blocks))
            break
        else:
            ##print "alloc addr=%d, size=%d" % (node.addr, node.size)
            blocks += 1
            heapq.heappush(queue, (T + lifetime, node))
        T = T+1
        if T % reportfreq == 0:
            cache.dump("T=%4d: %d blocks;" % (T, blocks))

def hitrate(loads, hits):
    return "%5.1f%%" % (100.0 * hits / max(1, loads))

def duration(secs):

    mm, ss = divmod(secs, 60)
    hh, mm = divmod(mm, 60)
    if hh:
        return "%d:%02d:%02d" % (hh, mm, ss)
    if mm:
        return "%d:%02d" % (mm, ss)
    return "%d" % ss

def addcommas(n):
    sign, s = '', str(n)
    if s[0] == '-':
        sign, s = '-', s[1:]
    i = len(s) - 3
    while i > 0:
        s = s[:i] + ',' + s[i:]
        i -= 3
    return sign + s

if __name__ == "__main__":
    sys.exit(main())


=== Added File Zope3/src/zodb/zeo/stats.py ===
#! /usr/bin/env python
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Trace file statistics analyzer.

Usage: stats.py [-h] [-i interval] [-q] [-s] [-S] [-v] [-X] tracefile
-h: print histogram of object load frequencies
-i: summarizing interval in minutes (default 15; max 60)
-q: quiet; don't print summaries
-s: print histogram of object sizes
-S: don't print statistics
-v: verbose; print each record
-X: enable heuristic checking for misaligned records: oids > 2**32
    will be rejected; this requires the tracefile to be seekable
"""

"""File format:

Each record is 24 bytes, with the following layout.  Numbers are
big-endian integers.

Offset  Size  Contents

0       4     timestamp (seconds since 1/1/1970)
4       3     data size, in 256-byte increments, rounded up
7       1     code (see below)
8       8     object id
16      8     serial number

The code at offset 7 packs three fields:

Mask    bits  Contents

0x80    1     set if there was a non-empty version string
0x7e    6     function and outcome code
0x01    1     current cache file (0 or 1)

The function and outcome codes are documented in detail at the end of
this file in the 'explain' dictionary.  Note that the keys there (and
also the arguments to _trace() in ClientStorage.py) are 'code & 0x7e',
i.e. the low bit is always zero.
"""

import sys
import time
import getopt
import struct

def usage(msg):
    print >>sys.stderr, msg
    print >>sys.stderr, __doc__

def main():
    # Parse options
    verbose = 0
    quiet = 0
    dostats = 1
    print_size_histogram = 0
    print_histogram = 0
    interval = 900 # Every 15 minutes
    heuristic = 0
    try:
        opts, args = getopt.getopt(sys.argv[1:], "hi:qsSvX")
    except getopt.error, msg:
        usage(msg)
        return 2
    for o, a in opts:
        if o == '-h':
            print_histogram = 1
        if o == "-i":
            interval = int(60 * float(a))
            if interval <= 0:
                interval = 60
            elif interval > 3600:
                interval = 3600
        if o == "-q":
            quiet = 1
            verbose = 0
        if o == "-s":
            print_size_histogram = 1
        if o == "-S":
            dostats = 0
        if o == "-v":
            verbose = 1
        if o == '-X':
            heuristic = 1
    if len(args) != 1:
        usage("exactly one file argument required")
        return 2
    filename = args[0]

    # Open file
    if filename.endswith(".gz"):
        # Open gzipped file
        try:
            import gzip
        except ImportError:
            print >>sys.stderr,  "can't read gzipped files (no module gzip)"
            return 1
        try:
            f = gzip.open(filename, "rb")
        except IOError, msg:
            print >>sys.stderr,  "can't open %s: %s" % (filename, msg)
            return 1
    elif filename == '-':
        # Read from stdin
        f = sys.stdin
    else:
        # Open regular file
        try:
            f = open(filename, "rb")
        except IOError, msg:
            print >>sys.stderr,  "can't open %s: %s" % (filename, msg)
            return 1

    # Read file, gathering statistics, and printing each record if verbose
    rt0 = time.time()
    bycode = {}
    records = 0
    versions = 0
    t0 = te = None
    datarecords = 0
    datasize = 0L
    file0 = file1 = 0
    oids = {}
    bysize = {}
    bysizew = {}
    total_loads = 0
    byinterval = {}
    thisinterval = None
    h0 = he = None
    offset = 0
    f_read = f.read
    struct_unpack = struct.unpack
    try:
        while 1:
            r = f_read(8)
            if len(r) < 8:
                break
            offset += 8
            ts, code = struct_unpack(">ii", r)
            if ts == 0:
                # Must be a misaligned record caused by a crash
                if not quiet:
                    print "Skipping 8 bytes at offset", offset-8,
                    print repr(r)
                continue
            oid = f_read(8)
            if len(oid) < 8:
                break
            if heuristic and oid[:4] != '\0\0\0\0':
                # Heuristic for severe data corruption
                print "Seeking back over bad oid at offset", offset,
                print repr(r)
                f.seek(-8, 1)
                continue
            offset += 8
            serial = f_read(8)
            if len(serial) < 8:
                break
            offset += 8
            records += 1
            if t0 is None:
                t0 = ts
                thisinterval = t0 / interval
                h0 = he = ts
            te = ts
            if ts / interval != thisinterval:
                if not quiet:
                    dumpbyinterval(byinterval, h0, he)
                byinterval = {}
                thisinterval = ts / interval
                h0 = ts
            he = ts
            dlen, code = code & 0x7fffff00, code & 0xff
            if dlen:
                datarecords += 1
                datasize += dlen
            version = '-'
            if code & 0x80:
                version = 'V'
                versions += 1
            current = code & 1
            if current:
                file1 += 1
            else:
                file0 += 1
            code = code & 0x7e
            bycode[code] = bycode.get(code, 0) + 1
            byinterval[code] = byinterval.get(code, 0) + 1
            if dlen:
                if code & 0x70 == 0x20: # All loads
                    bysize[dlen] = d = bysize.get(dlen) or {}
                    d[oid] = d.get(oid, 0) + 1
                elif code == 0x3A: # Update
                    bysizew[dlen] = d = bysizew.get(dlen) or {}
                    d[oid] = d.get(oid, 0) + 1
            if verbose:
                print "%s %d %02x %016x %016x %1s %s" % (
                    time.ctime(ts)[4:-5],
                    current,
                    code,
                    u64(oid),
                    u64(serial),
                    version,
                    dlen and str(dlen) or "")
            if code & 0x70 == 0x20:
                oids[oid] = oids.get(oid, 0) + 1
                total_loads += 1
            if code in (0x00, 0x70):
                if not quiet:
                    dumpbyinterval(byinterval, h0, he)
                byinterval = {}
                thisinterval = ts / interval
                h0 = he = ts
                if not quiet:
                    print time.ctime(ts)[4:-5],
                    if code == 0x00:
                        print '='*20, "Restart", '='*20
                    else:
                        print '-'*20, "Flip->%d" % current, '-'*20
    except KeyboardInterrupt:
        print "\nInterrupted.  Stats so far:\n"

    f.close()
    rte = time.time()
    if not quiet:
        dumpbyinterval(byinterval, h0, he)

    # Error if nothing was read
    if not records:
        print >>sys.stderr, "No records processed"
        return 1

    # Print statistics
    if dostats:
        print
        print "Read %s records (%s bytes) in %.1f seconds" % (
            addcommas(records), addcommas(records*24), rte-rt0)
        print "Versions:   %s records used a version" % addcommas(versions)
        print "First time: %s" % time.ctime(t0)
        print "Last time:  %s" % time.ctime(te)
        print "Duration:   %s seconds" % addcommas(te-t0)
        print "File stats: %s in file 0; %s in file 1" % (
            addcommas(file0), addcommas(file1))
        print "Data recs:  %s (%.1f%%), average size %.1f KB" % (
            addcommas(datarecords),
            100.0 * datarecords / records,
            datasize / 1024.0 / datarecords)
        print "Hit rate:   %.1f%% (load hits / loads)" % hitrate(bycode)
        print
        codes = bycode.keys()
        codes.sort()
        print "%13s %4s %s" % ("Count", "Code", "Function (action)")
        for code in codes:
            print "%13s  %02x  %s" % (
                addcommas(bycode.get(code, 0)),
                code,
                explain.get(code) or "*** unknown code ***")

    # Print histogram
    if print_histogram:
        print
        print "Histogram of object load frequency"
        total = len(oids)
        print "Unique oids: %s" % addcommas(total)
        print "Total loads: %s" % addcommas(total_loads)
        s = addcommas(total)
        width = max(len(s), len("objects"))
        fmt = "%5d %" + str(width) + "s %5.1f%% %5.1f%% %5.1f%%"
        hdr = "%5s %" + str(width) + "s %6s %6s %6s"
        print hdr % ("loads", "objects", "%obj", "%load", "%cum")
        cum = 0.0
        for binsize, count in histogram(oids):
            obj_percent = 100.0 * count / total
            load_percent = 100.0 * count * binsize / total_loads
            cum += load_percent
            print fmt % (binsize, addcommas(count),
                         obj_percent, load_percent, cum)

    # Print size histogram
    if print_size_histogram:
        print
        print "Histograms of object sizes"
        print
        dumpbysize(bysizew, "written", "writes")
        dumpbysize(bysize, "loaded", "loads")

def dumpbysize(bysize, how, how2):
    print
    print "Unique sizes %s: %s" % (how, addcommas(len(bysize)))
    print "%10s %6s %6s" % ("size", "objs", how2)
    sizes = bysize.keys()
    sizes.sort()
    for size in sizes:
        loads = 0
        for n in bysize[size].itervalues():
            loads += n
        print "%10s %6d %6d" % (addcommas(size),
                                len(bysize.get(size, "")),
                                loads)

def dumpbyinterval(byinterval, h0, he):
    loads = 0
    hits = 0
    for code in byinterval.keys():
        if code & 0x70 == 0x20:
            n = byinterval[code]
            loads += n
            if code in (0x2A, 0x2C, 0x2E):
                hits += n
    if not loads:
        return
    if loads:
        hr = 100.0 * hits / loads
    else:
        hr = 0.0
    print "%s-%s %10s loads, %10s hits,%5.1f%% hit rate" % (
        time.ctime(h0)[4:-8], time.ctime(he)[14:-8],
        addcommas(loads), addcommas(hits), hr)

def hitrate(bycode):
    loads = 0
    hits = 0
    for code in bycode.keys():
        if code & 0x70 == 0x20:
            n = bycode[code]
            loads += n
            if code in (0x2A, 0x2C, 0x2E):
                hits += n
    if loads:
        return 100.0 * hits / loads
    else:
        return 0.0

def histogram(d):
    bins = {}
    for v in d.itervalues():
        bins[v] = bins.get(v, 0) + 1
    L = bins.items()
    L.sort()
    return L

def u64(v):
    """Unpack an 8-byte string into a 64-bit long integer."""
    return struct.unpack(">Q", v)[0]

def addcommas(n):
    sign, s = '', str(n)
    if s[0] == '-':
        sign, s = '-', s[1:]
    i = len(s) - 3
    while i > 0:
        s = s[:i] + ',' + s[i:]
        i -= 3
    return sign + s

explain = {
    # The first hex digit shows the operation, the second the outcome.
    # If the second digit is in "02468" then it is a 'miss'.
    # If it is in "ACE" then it is a 'hit'.

    0x00: "_setup_trace (initialization)",

    0x10: "invalidate (miss)",
    0x1A: "invalidate (hit, version, writing 'n')",
    0x1C: "invalidate (hit, writing 'i')",

    0x20: "load (miss)",
    0x22: "load (miss, version, status 'n')",
    0x24: "load (miss, deleting index entry)",
    0x26: "load (miss, no non-version data)",
    0x28: "load (miss, version mismatch, no non-version data)",
    0x2A: "load (hit, returning non-version data)",
    0x2C: "load (hit, version mismatch, returning non-version data)",
    0x2E: "load (hit, returning version data)",

    0x3A: "update",

    0x40: "modifiedInVersion (miss)",
    0x4A: "modifiedInVersion (hit, return None, status 'n')",
    0x4C: "modifiedInVersion (hit, return '')",
    0x4E: "modifiedInVersion (hit, return version)",

    0x5A: "store (non-version data present)",
    0x5C: "store (only version data present)",

    0x6A: "_copytocurrent",

    0x70: "checkSize (cache flip)",
    }

if __name__ == "__main__":
    sys.exit(main())


=== Added File Zope3/src/zodb/zeo/stubs.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""RPC stubs for interface exported by ClientStorage."""

class ClientStorage:

    """An RPC stub class for the interface exported by ClientStorage.

    This is the interface presented by ClientStorage to the
    StorageServer; i.e. the StorageServer calls these methods and they
    are executed in the ClientStorage.

    See the ClientStorage class for documentation on these methods.

    It is currently important that all methods here are asynchronous
    (meaning they don't have a return value and the caller doesn't
    wait for them to complete), *and* that none of them cause any
    calls from the client to the storage.  This is due to limitations
    in the zrpc subpackage.

    The on-the-wire names of some of the methods don't match the
    Python method names.  That's because the on-the-wire protocol was
    fixed for ZEO 2 and we don't want to change it.  There are some
    aliases in ClientStorage.py to make up for this.
    """

    def __init__(self, rpc):
        """Constructor.

        The argument is a connection: an instance of the
        zrpc.connection.Connection class.
        """
        self.rpc = rpc

    def beginVerify(self):
        self.rpc.callAsync('begin')

    def invalidateVerify(self, args):
        self.rpc.callAsync('invalidate', args)

    def endVerify(self):
        self.rpc.callAsync('end')

    def invalidateTrans(self, args):
        self.rpc.callAsync('Invalidate', args)

    def serialnos(self, arg):
        self.rpc.callAsync('serialnos', arg)

    def info(self, arg):
        self.rpc.callAsync('info', arg)


"""RPC stubs for interface exported by StorageServer."""

class StorageServer:

    """An RPC stub class for the interface exported by ClientStorage.

    This is the interface presented by the StorageServer to the
    ClientStorage; i.e. the ClientStorage calls these methods and they
    are executed in the StorageServer.

    See the StorageServer module for documentation on these methods,
    with the exception of _update(), which is documented here.
    """

    def __init__(self, rpc):
        """Constructor.

        The argument is a connection: an instance of the
        zrpc.connection.Connection class.
        """
        self.rpc = rpc

    def extensionMethod(self, name):
        return ExtensionMethodWrapper(self.rpc, name).call

    def _update(self):
        """Handle pending incoming messages.

        This method is typically only used when no asyncore mainloop
        is already active.  It can cause arbitrary callbacks from the
        server to the client to be handled.
        """
        self.rpc.pending()

    def register(self, storage_name, read_only):
        self.rpc.call('register', storage_name, read_only)

    def get_info(self):
        return self.rpc.call('get_info')

    def beginZeoVerify(self):
        self.rpc.callAsync('beginZeoVerify')

    def zeoVerify(self, oid, s, sv):
        self.rpc.callAsync('zeoVerify', oid, s, sv)

    def endZeoVerify(self):
        self.rpc.callAsync('endZeoVerify')

    def new_oids(self, n=None):
        if n is None:
            return self.rpc.call('new_oids')
        else:
            return self.rpc.call('new_oids', n)

    def pack(self, t, wait=None):
        if wait is None:
            self.rpc.call('pack', t)
        else:
            self.rpc.call('pack', t, wait)

    def zeoLoad(self, oid):
        return self.rpc.call('zeoLoad', oid)

    def storea(self, oid, serial, data, version, id):
        self.rpc.callAsync('storea', oid, serial, data, version, id)

    def tpc_begin(self, id, user, descr, ext, tid, status):
        return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)

    def vote(self, trans_id):
        return self.rpc.call('vote', trans_id)

    def tpc_finish(self, id):
        return self.rpc.call('tpc_finish', id)

    def tpc_abort(self, id):
        self.rpc.callAsync('tpc_abort', id)

    def abortVersion(self, src, id):
        return self.rpc.call('abortVersion', src, id)

    def commitVersion(self, src, dest, id):
        return self.rpc.call('commitVersion', src, dest, id)

    def history(self, oid, version, length=None):
        if length is None:
            return self.rpc.call('history', oid, version)
        else:
            return self.rpc.call('history', oid, version, length)

    def load(self, oid, version):
        return self.rpc.call('load', oid, version)

    def loadSerial(self, oid, serial):
        return self.rpc.call('loadSerial', oid, serial)

    def modifiedInVersion(self, oid):
        return self.rpc.call('modifiedInVersion', oid)

    def new_oid(self, last=None):
        if last is None:
            return self.rpc.call('new_oid')
        else:
            return self.rpc.call('new_oid', last)

    def store(self, oid, serial, data, version, trans):
        return self.rpc.call('store', oid, serial, data, version, trans)

    def transactionalUndo(self, trans_id, trans):
        return self.rpc.call('transactionalUndo', trans_id, trans)

    def undo(self, trans_id):
        return self.rpc.call('undo', trans_id)

    def undoLog(self, first, last):
        return self.rpc.call('undoLog', first, last)

    def undoInfo(self, first, last, spec):
        return self.rpc.call('undoInfo', first, last, spec)

    def versionEmpty(self, vers):
        return self.rpc.call('versionEmpty', vers)

    def versions(self, max=None):
        if max is None:
            return self.rpc.call('versions')
        else:
            return self.rpc.call('versions', max)

class ExtensionMethodWrapper:
    def __init__(self, rpc, name):
        self.rpc = rpc
        self.name = name
    def call(self, *a, **kwa):
        return apply(self.rpc.call, (self.name,)+a, kwa)


=== Added File Zope3/src/zodb/zeo/tbuf.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A TransactionBuffer store transaction updates until commit or abort.

A transaction may generate enough data that it is not practical to
always hold pending updates in memory.  Instead, a TransactionBuffer
is used to store the data until a commit or abort.
"""

# A faster implementation might store trans data in memory until it
# reaches a certain size.

import cPickle
import tempfile
from threading import Lock

class TransactionBuffer:

    # Valid call sequences:
    #
    #     ((store | invalidate)* begin_iterate next* clear)* close
    #
    # get_size can be called any time

    # The TransactionBuffer is used by client storage to hold update
    # data until the tpc_finish().  It is normally used by a single
    # thread, because only one thread can be in the two-phase commit
    # at one time.

    # It is possible, however, for one thread to close the storage
    # while another thread is in the two-phase commit.  We must use
    # a lock to guard against this race, because unpredictable things
    # can happen in Python if one thread closes a file that another
    # thread is reading.  In a debug build, an assert() can fail.

    # XXX If an operation is performed on a closed TransactionBuffer,
    # it has no effect and does not raise an exception.  The only time
    # this should occur is when a ClientStorage is closed in one
    # thread while another thread is in its tpc_finish().  It's not
    # clear what should happen in this case.  If the tpc_finish()
    # completes without error, the Connection using it could have
    # inconsistent data.  This should have minimal effect, though,
    # because the Connection is connected to a closed storage.

    def __init__(self):
        self.file = tempfile.TemporaryFile(suffix=".tbuf")
        self.lock = Lock()
        self.closed = 0
        self.count = 0
        self.size = 0
        # It's safe to use a fast pickler because the only objects
        # stored are builtin types -- strings or None.
        self.pickler = cPickle.Pickler(self.file, 1)
        self.pickler.fast = 1

    def close(self):
        self.lock.acquire()
        try:
            self.closed = 1
            try:
                self.file.close()
            except OSError:
                pass
        finally:
            self.lock.release()

    def store(self, oid, version, data):
        self.lock.acquire()
        try:
            self._store(oid, version, data)
        finally:
            self.lock.release()

    def _store(self, oid, version, data):
        """Store oid, version, data for later retrieval"""
        if self.closed:
            return
        self.pickler.dump((oid, version, data))
        self.count += 1
        # Estimate per-record cache size
        self.size = self.size + len(data) + 31
        if version:
            # Assume version data has same size as non-version data
            self.size = self.size + len(version) + len(data) + 12

    def invalidate(self, oid, version):
        self.lock.acquire()
        try:
            if self.closed:
                return
            self.pickler.dump((oid, version, None))
            self.count += 1
        finally:
            self.lock.release()

    def clear(self):
        """Mark the buffer as empty"""
        self.lock.acquire()
        try:
            if self.closed:
                return
            self.file.seek(0)
            self.count = 0
            self.size = 0
        finally:
            self.lock.release()

    # unchecked constraints:
    # 1. can't call store() after begin_iterate()
    # 2. must call clear() after iteration finishes

    def begin_iterate(self):
        """Move the file pointer in advance of iteration"""
        self.lock.acquire()
        try:
            if self.closed:
                return
            self.file.flush()
            self.file.seek(0)
            self.unpickler = cPickle.Unpickler(self.file)
        finally:
            self.lock.release()

    def next(self):
        self.lock.acquire()
        try:
            return self._next()
        finally:
            self.lock.release()

    def _next(self):
        """Return next tuple of data or None if EOF"""
        if self.closed:
            return None
        if self.count == 0:
            del self.unpickler
            return None
        oid_ver_data = self.unpickler.load()
        self.count -= 1
        return oid_ver_data

    def get_size(self):
        """Return size of data stored in buffer (just a hint)."""

        return self.size


=== Added File Zope3/src/zodb/zeo/utils.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Utilities for setting up the server environment."""

import os

def parentdir(p, n=1):
    """Return the parent of p, n levels up."""
    d = p
    while n:
        d = os.path.split(d)[0]
        if not d or d == '.':
            d = os.getcwd()
        n -= 1
    return d

class Environment:
    """Determine location of the Data.fs & ZEO_SERVER.pid files.

    Pass the argv[0] used to start ZEO to the constructor.

    Use the zeo_pid and fs attributes to get the filenames.
    """

    def __init__(self, argv0):
        v = os.environ.get("INSTANCE_HOME")
        if v is None:
            # looking for a Zope/var directory assuming that this code
            # is installed in Zope/lib/python/ZEO
            p = parentdir(argv0, 4)
            if os.path.isdir(os.path.join(p, "var")):
                v = p
            else:
                v = os.getcwd()
        self.home = v
        self.var = os.path.join(v, "var")
        if not os.path.isdir(self.var):
            self.var = self.home

        pid = os.environ.get("ZEO_SERVER_PID")
        if pid is None:
            pid = os.path.join(self.var, "ZEO_SERVER.pid")

        self.zeo_pid = pid
        self.fs = os.path.join(self.var, "Data.fs")