[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - __init__.py:1.2 cache.py:1.2 commitlock.py:1.2 connection.py:1.2 forker.py:1.2 multi.py:1.2 speed.py:1.2 stress.py:1.2 test_cache.py:1.2 test_conn.py:1.2 test_tbuf.py:1.2 test_zeo.py:1.2 threadtests.py:1.2 zeoserver.py:1.2

Jim Fulton jim@zope.com
Wed, 25 Dec 2002 09:13:55 -0500


Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv15352/src/zodb/zeo/tests

Added Files:
	__init__.py cache.py commitlock.py connection.py forker.py 
	multi.py speed.py stress.py test_cache.py test_conn.py 
	test_tbuf.py test_zeo.py threadtests.py zeoserver.py 
Log Message:
Grand renaming:

- Renamed most files (especially python modules) to lower case.

- Moved views and interfaces into separate hierarchies within each
  project, where each top-level directory under the zope package
  is a separate project.

- Moved everything to src from lib/python.

  lib/python will eventually go away. I need access to the cvs
  repository to make this happen, however.

There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.



=== Zope3/src/zodb/zeo/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/__init__.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,13 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################


=== Zope3/src/zodb/zeo/tests/cache.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/cache.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,105 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Tests of the ZEO cache"""
+
+from zodb.ztransaction import Transaction
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_unpickle
+
+class TransUndoStorageWithCache:
+
+    def checkUndoInvalidation(self):
+        oid = self._storage.new_oid()
+        revid = self._dostore(oid, data=MinPO(23))
+        revid = self._dostore(oid, revid=revid, data=MinPO(24))
+        revid = self._dostore(oid, revid=revid, data=MinPO(25))
+
+        info = self._storage.undoInfo()
+        if not info:
+            # XXX perhaps we have an old storage implementation that
+            # does do the negative nonsense
+            info = self._storage.undoInfo(0, 20)
+        tid = info[0]['id']
+
+        # We may need to bail at this point if the storage doesn't
+        # support transactional undo
+        if not self._storage.supportsTransactionalUndo():
+            return
+
+        # Now start an undo transaction
+        t = Transaction()
+        t.note('undo1')
+        self._storage.tpc_begin(t)
+
+        oids = self._storage.transactionalUndo(tid, t)
+
+        # Make sure this doesn't load invalid data into the cache
+        self._storage.load(oid, '')
+
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+
+        assert len(oids) == 1
+        assert oids[0] == oid
+        data, revid = self._storage.load(oid, '')
+        obj = zodb_unpickle(data)
+        assert obj == MinPO(24)
+
+class StorageWithCache:
+
+    def checkAbortVersionInvalidation(self):
+        oid = self._storage.new_oid()
+        revid = self._dostore(oid, data=MinPO(1))
+        revid = self._dostore(oid, revid=revid, data=MinPO(2))
+        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+        revid = self._dostore(oid, revid=revid, data=MinPO(4), version="foo")
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.abortVersion("foo", t)
+        self._storage.load(oid, "foo")
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        data, revid = self._storage.load(oid, "foo")
+        obj = zodb_unpickle(data)
+        assert obj == MinPO(2), obj
+
+    def checkCommitEmptyVersionInvalidation(self):
+        oid = self._storage.new_oid()
+        revid = self._dostore(oid, data=MinPO(1))
+        revid = self._dostore(oid, revid=revid, data=MinPO(2))
+        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.commitVersion("foo", "", t)
+        self._storage.load(oid, "")
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        data, revid = self._storage.load(oid, "")
+        obj = zodb_unpickle(data)
+        assert obj == MinPO(3), obj
+
+    def checkCommitVersionInvalidation(self):
+        oid = self._storage.new_oid()
+        revid = self._dostore(oid, data=MinPO(1))
+        revid = self._dostore(oid, revid=revid, data=MinPO(2))
+        revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.commitVersion("foo", "bar", t)
+        self._storage.load(oid, "")
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        data, revid = self._storage.load(oid, "bar")
+        obj = zodb_unpickle(data)
+        assert obj == MinPO(3), obj


=== Zope3/src/zodb/zeo/tests/commitlock.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/commitlock.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,219 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Tests of the distributed commit lock."""
+
+import threading
+import time
+
+from zodb.ztransaction import Transaction
+from zodb.timestamp import TimeStamp
+from zodb.storage.tests.base import zodb_pickle, MinPO
+
+from zodb.zeo.client import ClientStorage
+from zodb.zeo.interfaces import Disconnected
+
+ZERO = '\0'*8
+
+class DummyDB:
+    def invalidate(self, *args):
+        pass
+
+class TestThread(threading.Thread):
+    __super_init = threading.Thread.__init__
+    __super_run = threading.Thread.run
+
+    def __init__(self, testcase, group=None, target=None, name=None,
+                 args=(), kwargs={}, verbose=None):
+        self.__super_init(group, target, name, args, kwargs, verbose)
+        self.setDaemon(1)
+        self._testcase = testcase
+
+    def run(self):
+        try:
+            self.testrun()
+        except Exception:
+            s = StringIO()
+            traceback.print_exc(file=s)
+            self._testcase.fail("Exception in thread %s:\n%s\n" %
+                                (self, s.getvalue()))
+
+    def cleanup(self, timeout=15):
+        self.join(timeout)
+        if self.isAlive():
+            self._testcase.fail("Thread did not finish: %s" % self)
+
+class WorkerThread(TestThread):
+
+    # run the entire test in a thread so that the blocking call for
+    # tpc_vote() doesn't hang the test suite.
+
+    def __init__(self, testcase, storage, trans, method="tpc_finish"):
+        self.storage = storage
+        self.trans = trans
+        self.method = method
+        self.ready = threading.Event()
+        TestThread.__init__(self, testcase)
+
+    def testrun(self):
+        try:
+            self.storage.tpc_begin(self.trans)
+            oid = self.storage.new_oid()
+            p = zodb_pickle(MinPO("c"))
+            self.storage.store(oid, ZERO, p, '', self.trans)
+            oid = self.storage.new_oid()
+            p = zodb_pickle(MinPO("c"))
+            self.storage.store(oid, ZERO, p, '', self.trans)
+            self.ready.set()
+            self.storage.tpc_vote(self.trans)
+            if self.method == "tpc_finish":
+                self.storage.tpc_finish(self.trans)
+            else:
+                self.storage.tpc_abort(self.trans)
+        except Disconnected:
+            pass
+
+class CommitLockTests:
+
+    # The commit lock tests verify that the storage successfully
+    # blocks and restarts transactions when there is content for a
+    # single storage.  There are a lot of cases to cover.
+
+    # CommitLock1 checks the case where a single transaction delays
+    # other transactions before they actually block.  IOW, by the time
+    # the other transactions get to the vote stage, the first
+    # transaction has finished.
+
+    def checkCommitLock1OnCommit(self):
+        self._storages = []
+        try:
+            self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
+        finally:
+            self._cleanup()
+
+    def checkCommitLock1OnAbort(self):
+        self._storages = []
+        try:
+            self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
+        finally:
+            self._cleanup()
+
+    def checkCommitLock2OnCommit(self):
+        self._storages = []
+        try:
+            self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
+        finally:
+            self._cleanup()
+
+    def checkCommitLock2OnAbort(self):
+        self._storages = []
+        try:
+            self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
+        finally:
+            self._cleanup()
+
+    def _cleanup(self):
+        for store, trans in self._storages:
+            store.tpc_abort(trans)
+            store.close()
+        self._storages = []
+
+    def _checkCommitLock(self, method_name, dosetup, dowork):
+        # check the commit lock when a client attemps a transaction,
+        # but fails/exits before finishing the commit.
+
+        # The general flow of these tests is to start a transaction by
+        # calling tpc_begin().  Then begin one or more other
+        # connections that also want to commit.  This causes the
+        # commit lock code to be exercised.  Once the other
+        # connections are started, the first transaction completes.
+        # Either by commit or abort, depending on whether method_name
+        # is "tpc_finish."
+
+        # The tests are parameterized by method_name, dosetup(), and
+        # dowork().  The dosetup() function is called with a
+        # connectioned client storage, transaction, and timestamp.
+        # Any work it does occurs after the first transaction has
+        # started, but before it finishes.  The dowork() function
+        # executes after the first transaction has completed.
+
+        # Start on transaction normally and get the lock.
+        t = Transaction()
+        self._storage.tpc_begin(t)
+        oid = self._storage.new_oid()
+        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
+        self._storage.tpc_vote(t)
+
+        # Start a second transaction on a different connection without
+        # blocking the test thread.
+        self._storages = []
+        for i in range(4):
+            storage2 = self._duplicate_client()
+            t2 = Transaction()
+            tid = self._get_timestamp()
+            dosetup(storage2, t2, tid)
+            if i == 0:
+                storage2.close()
+            else:
+                self._storages.append((storage2, t2))
+
+        if method_name == "tpc_finish":
+            self._storage.tpc_finish(t)
+            self._storage.load(oid, '')
+        else:
+            self._storage.tpc_abort(t)
+
+        dowork(method_name)
+
+        # Make sure the server is still responsive
+        self._dostore()
+
+    def _dosetup1(self, storage, trans, tid):
+        storage.tpc_begin(trans, tid)
+
+    def _dowork1(self, method_name):
+        for store, trans in self._storages:
+            oid = store.new_oid()
+            store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
+            store.tpc_vote(trans)
+            if method_name == "tpc_finish":
+                store.tpc_finish(trans)
+            else:
+                store.tpc_abort(trans)
+
+    def _dosetup2(self, storage, trans, tid):
+        self._threads = []
+        t = WorkerThread(self, storage, trans)
+        self._threads.append(t)
+        t.start()
+        t.ready.wait()
+
+    def _dowork2(self, method_name):
+        for t in self._threads:
+            t.cleanup()
+
+    def _duplicate_client(self):
+        "Open another ClientStorage to the same server."
+        # XXX argh it's hard to find the actual address
+        # The rpc mgr addr attribute is a list.  Each element in the
+        # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
+        # address.
+        addr = self._storage._addr
+        new = ClientStorage(addr, wait=1)
+        new.registerDB(DummyDB())
+        return new
+
+    def _get_timestamp(self):
+        t = time.time()
+        ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
+        return ts.raw()


=== Zope3/src/zodb/zeo/tests/connection.py 1.1 => 1.2 === (533/633 lines abridged)
--- /dev/null	Wed Dec 25 09:13:53 2002
+++ Zope3/src/zodb/zeo/tests/connection.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,630 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+
+import os
+import sys
+import time
+import random
+import select
+import socket
+import asyncore
+import tempfile
+import threading
+import logging
+
+from zodb.zeo.client import ClientStorage
+from zodb.zeo.interfaces import Disconnected
+from zodb.zeo.zrpc.marshal import Marshaller
+from zodb.zeo.tests import forker
+
+from transaction import get_transaction
+from zodb.interfaces import ReadOnlyError
+from zodb.ztransaction import Transaction
+from zodb.storage.tests.base import StorageTestBase
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
+from zodb.storage.tests.base import handle_all_serials, ZERO
+
+
+class DummyDB:
+    def invalidate(self, *args, **kws):
+        pass
+
+
+class CommonSetupTearDown(StorageTestBase):
+    """Tests that explicitly manage the server process.
+

[-=- -=- -=- 533 lines omitted -=- -=- -=-]

+            # expanded in-line (mostly).
+
+            # Create oid->serial mappings
+            for c in clients:
+                c.__oids = []
+                c.__serials = {}
+
+            # Begin a transaction
+            t = Transaction()
+            for c in clients:
+                #print "%s.%s.%s begin\n" % (tname, c.__name, i),
+                c.tpc_begin(t)
+
+            for j in range(testcase.nobj):
+                for c in clients:
+                    # Create and store a new object on each server
+                    oid = c.new_oid()
+                    c.__oids.append(oid)
+                    data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
+                    #print data.value
+                    data = zodb_pickle(data)
+                    s = c.store(oid, ZERO, data, '', t)
+                    c.__serials.update(handle_all_serials(oid, s))
+
+            # Vote on all servers and handle serials
+            for c in clients:
+                #print "%s.%s.%s vote\n" % (tname, c.__name, i),
+                s = c.tpc_vote(t)
+                c.__serials.update(handle_all_serials(None, s))
+
+            # Finish on all servers
+            for c in clients:
+                #print "%s.%s.%s finish\n" % (tname, c.__name, i),
+                c.tpc_finish(t)
+
+            for c in clients:
+                # Check that we got serials for all oids
+                for oid in c.__oids:
+                    testcase.failUnless(c.__serials.has_key(oid))
+                # Check that we got serials for no other oids
+                for oid in c.__serials.keys():
+                    testcase.failUnless(oid in c.__oids)
+
+    def closeclients(self):
+        # Close clients opened by run()
+        for c in self.clients:
+            try:
+                c.close()
+            except:
+                pass


=== Zope3/src/zodb/zeo/tests/forker.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/forker.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,111 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Library for forking storage server and connecting client storage"""
+
+import os
+import sys
+import time
+import errno
+import random
+import socket
+import tempfile
+import traceback
+import logging
+
+# Change value of PROFILE to enable server-side profiling
+PROFILE = False
+if PROFILE:
+    import hotshot
+
+
+def get_port():
+    """Return a port that is not in use.
+
+    Checks if a port is in use by trying to connect to it.  Assumes it
+    is not in use if connect raises an exception.
+
+    Raises RuntimeError after 10 tries.
+    """
+    for i in range(10):
+        port = random.randrange(20000, 30000)
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        try:
+            try:
+                s.connect(('localhost', port))
+            except socket.error:
+                # XXX check value of error?
+                return port
+        finally:
+            s.close()
+    raise RuntimeError, "Can't find port"
+
+
+def start_zeo_server(conf, addr=None, ro_svr=False, keep=False):
+    """Start a ZEO server in a separate process.
+
+    Returns the ZEO port, the test server port, and the pid.
+    """
+    # Store the config info in a temp file.
+    tmpfile = tempfile.mktemp()
+    fp = open(tmpfile, 'w')
+    fp.write(conf)
+    fp.close()
+    # Create the server
+    import zodb.zeo.tests.zeoserver
+    if addr is None:
+        port = get_port()
+    else:
+        port = addr[1]
+    script = zodb.zeo.tests.zeoserver.__file__
+    if script.endswith('.pyc'):
+        script = script[:-1]
+    # Create a list of arguments, which we'll tuplify below
+    args = [sys.executable, script, '-C', tmpfile]
+    if ro_svr:
+        args.append('-r')
+    if keep:
+        args.append('-k')
+    args.append(str(port))
+    d = os.environ.copy()
+    d['PYTHONPATH'] = os.pathsep.join(sys.path)
+    pid = os.spawnve(os.P_NOWAIT, sys.executable, tuple(args), d)
+    adminaddr = ('localhost', port+1)
+    # We need to wait until the server starts, but not forever
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    for i in range(5):
+        try:
+            logging.debug('forker: connect %s', i)
+            s.connect(adminaddr)
+            ack = s.recv(1024)
+            logging.debug('forker: acked: %s', ack)
+            break
+        except socket.error, e:
+            if e[0] <> errno.ECONNREFUSED: raise
+            time.sleep(1)
+    else:
+        logging.debug('forker: boo hoo')
+        raise
+    return ('localhost', port), adminaddr, pid
+
+
+def shutdown_zeo_server(adminaddr):
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.connect(adminaddr)
+    try:
+        ack = s.recv(1024)
+    except socket.error, e:
+        if e[0] <> errno.ECONNRESET: raise
+        ack = 'no ack received'
+    logging.debug('shutdownServer: acked: %s', ack)
+    s.close()


=== Zope3/src/zodb/zeo/tests/multi.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/multi.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,160 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A multi-client test of the ZEO storage server"""
+# XXX This code is currently broken.
+
+import zodb.db
+from zodb.storage.file import FileStorage
+import zodb.interfaces
+import persistence
+import PersistentMapping
+from zodb.zeo.tests import forker
+
+import asyncore
+import os
+import tempfile
+import time
+import types
+
+VERBOSE = 1
+CLIENTS = 4
+RECORDS_PER_CLIENT = 100
+CONFLICT_DELAY = 0.1
+CONNECT_DELAY = 0.1
+CLIENT_CACHE = '' # use temporary cache
+
+class Record(Persistence.Persistent):
+    def __init__(self, client=None, value=None):
+        self.client = client
+        self.value = None
+        self.next = None
+
+    def set_next(self, next):
+        self.next = next
+
+class Stats(Persistence.Persistent):
+    def __init__(self):
+        self.begin = time.time()
+        self.end = None
+
+    def done(self):
+        self.end = time.time()
+
+def init_storage():
+    path = tempfile.mktemp()
+    if VERBOSE:
+        print "FileStorage path:", path
+    fs = FileStorage(path)
+
+    db = zodb.db(fs)
+    root = db.open().root()
+    root["multi"] = PersistentMapping.PersistentMapping()
+    get_transaction().commit()
+
+    return fs
+
+def start_server(addr):
+    storage = init_storage()
+    pid, exit = forker.start_zeo_server(storage, addr)
+    return pid, exit
+
+def start_client(addr, client_func=None):
+    pid = os.fork()
+    if pid == 0:
+        try:
+            import ZEO.ClientStorage
+            if VERBOSE:
+                print "Client process started:", os.getpid()
+            cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
+            if client_func is None:
+                run(cli)
+            else:
+                client_func(cli)
+            cli.close()
+        finally:
+            os._exit(0)
+    else:
+        return pid
+
+def run(storage):
+    if hasattr(storage, 'is_connected'):
+        while not storage.is_connected():
+            time.sleep(CONNECT_DELAY)
+    pid = os.getpid()
+    print "Client process connected:", pid, storage
+    db = zodb.db(storage)
+    root = db.open().root()
+    while 1:
+        try:
+            s = root[pid] = Stats()
+            get_transaction().commit()
+        except zodb.interfaces.ConflictError:
+            get_transaction().abort()
+            time.sleep(CONFLICT_DELAY)
+        else:
+            break
+
+    dict = root["multi"]
+    prev = None
+    i = 0
+    while i < RECORDS_PER_CLIENT:
+        try:
+            size = len(dict)
+            r = dict[size] = Record(pid, size)
+            if prev:
+                prev.set_next(r)
+            get_transaction().commit()
+        except zodb.interfaces.ConflictError, err:
+            get_transaction().abort()
+            time.sleep(CONFLICT_DELAY)
+        else:
+            i = i + 1
+            if VERBOSE and (i < 5 or i % 10 == 0):
+                print "Client %s: %s of %s" % (pid, i, RECORDS_PER_CLIENT)
+    s.done()
+    get_transaction().commit()
+
+    print "Client completed:", pid
+
+def main(client_func=None):
+    if VERBOSE:
+        print "Main process:", os.getpid()
+    addr = tempfile.mktemp()
+    t0 = time.time()
+    server_pid, server = start_server(addr)
+    t1 = time.time()
+    pids = []
+    for i in range(CLIENTS):
+        pids.append(start_client(addr, client_func))
+    for pid in pids:
+        assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
+               (repr(pid), type(pid))
+        try:
+            if VERBOSE:
+                print "waitpid(%s)" % repr(pid)
+            os.waitpid(pid, 0)
+        except os.error, err:
+            print "waitpid(%s) failed: %s" % (repr(pid), err)
+    t2 = time.time()
+    server.close()
+    os.waitpid(server_pid, 0)
+
+    # XXX Should check that the results are consistent!
+
+    print "Total time:", t2 - t0
+    print "Server start time", t1 - t0
+    print "Client time:", t2 - t1
+
+if __name__ == "__main__":
+    main()


=== Zope3/src/zodb/zeo/tests/speed.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/speed.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,215 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+usage="""Test speed of a ZODB storage
+
+Options:
+
+    -d file    The data file to use as input.
+               The default is this script.
+
+    -n n       The number of repititions
+
+    -s module  A module that defines a 'Storage'
+               attribute, which is an open storage.
+               If not specified, a FileStorage will ne
+               used.
+
+    -z         Test compressing data
+
+    -D         Run in debug mode
+
+    -L         Test loads as well as stores by minimizing
+               the cache after eachrun
+
+    -M         Output means only
+
+    -C         Run with a persistent client cache
+
+    -U         Run ZEO using a Unix domain socket
+
+    -t n       Number of concurrent threads to run.
+"""
+
+import asyncore
+import sys, os, getopt, string, time
+##sys.path.insert(0, os.getcwd())
+
+import zodb.db
+import persistence
+import ZEO.ClientStorage, ZEO.StorageServer
+from zodb.zeo.tests import forker
+from zodb.interfaces import ConflictError
+
+class P(Persistence.Persistent):
+    pass
+
+fs_name = "zeo-speed.fs"
+
+class ZEOExit(asyncore.file_dispatcher):
+    """Used to exit ZEO.StorageServer when run is done"""
+    def writable(self):
+        return 0
+    def readable(self):
+        return 1
+    def handle_read(self):
+        buf = self.recv(4)
+        assert buf == "done"
+        self.delete_fs()
+        os._exit(0)
+    def handle_close(self):
+        print "Parent process exited unexpectedly"
+        self.delete_fs()
+        os._exit(0)
+    def delete_fs(self):
+        os.unlink(fs_name)
+        os.unlink(fs_name + ".lock")
+        os.unlink(fs_name + ".tmp")
+
+def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
+    for j in range(nrep):
+        for r in 1, 10, 100, 1000:
+            t = time.time()
+            conflicts = 0
+
+            jar = db.open()
+            while 1:
+                try:
+                    get_transaction().begin()
+                    rt = jar.root()
+                    key = 's%s' % r
+                    if rt.has_key(key):
+                        p = rt[key]
+                    else:
+                        rt[key] = p =P()
+                    for i in range(r):
+                        v = getattr(p, str(i), P())
+                        if compress is not None:
+                            v.d = compress(data)
+                        else:
+                            v.d = data
+                        setattr(p, str(i), v)
+                    get_transaction().commit()
+                except ConflictError:
+                    conflicts = conflicts + 1
+                else:
+                    break
+            jar.close()
+
+            t = time.time() - t
+            if detailed:
+                if threadno is None:
+                    print "%s\t%s\t%.4f\t%d" % (j, r, t, conflicts)
+                else:
+                    print "%s\t%s\t%.4f\t%d\t%d" % (j, r, t, conflicts,
+                                                    threadno)
+            results[r].append((t, conflicts))
+            rt=d=p=v=None # release all references
+            if minimize:
+                time.sleep(3)
+                jar.cacheMinimize(3)
+
+def main(args):
+    opts, args = getopt.getopt(args, 'zd:n:Ds:LMt:U')
+    s = None
+    compress = None
+    data=sys.argv[0]
+    nrep=5
+    minimize=0
+    detailed=1
+    cache = None
+    domain = 'AF_INET'
+    threads = 1
+    for o, v in opts:
+        if o=='-n': nrep = int(v)
+        elif o=='-d': data = v
+        elif o=='-s': s = v
+        elif o=='-z':
+            import zlib
+            compress = zlib.compress
+        elif o=='-L':
+            minimize=1
+        elif o=='-M':
+            detailed=0
+        elif o=='-D':
+            global debug
+            os.environ['STUPID_LOG_FILE']=''
+            os.environ['STUPID_LOG_SEVERITY']='-999'
+            debug = 1
+        elif o == '-C':
+            cache = 'speed'
+        elif o == '-U':
+            domain = 'AF_UNIX'
+        elif o == '-t':
+            threads = int(v)
+
+    zeo_pipe = None
+    if s:
+        s = __import__(s, globals(), globals(), ('__doc__',))
+        s = s.Storage
+        server = None
+    else:
+        s, server, pid = forker.start_zeo("FileStorage",
+                                          (fs_name, 1), domain=domain)
+
+    data=open(data).read()
+    db=zodb.db(s,
+               # disable cache deactivation
+               cache_size=4000,
+               cache_deactivate_after=6000,)
+
+    print "Beginning work..."
+    results={1:[], 10:[], 100:[], 1000:[]}
+    if threads > 1:
+        import threading
+        l = []
+        for i in range(threads):
+            t = threading.Thread(target=work,
+                                 args=(db, results, nrep, compress, data,
+                                       detailed, minimize, i))
+            l.append(t)
+        for t in l:
+            t.start()
+        for t in l:
+            t.join()
+
+    else:
+        work(db, results, nrep, compress, data, detailed, minimize)
+
+    if server is not None:
+        server.close()
+        os.waitpid(pid, 0)
+
+    if detailed:
+        print '-'*24
+    print "num\tmean\tmin\tmax"
+    for r in 1, 10, 100, 1000:
+        times = []
+        for time, conf in results[r]:
+            times.append(time)
+        t = mean(times)
+        print "%d\t%.4f\t%.4f\t%.4f" % (r, t, min(times), max(times))
+
+def mean(l):
+    tot = 0
+    for v in l:
+        tot = tot + v
+    return tot / len(l)
+
+##def compress(s):
+##    c = zlib.compressobj()
+##    o = c.compress(s)
+##    return o + c.flush()
+
+if __name__=='__main__':
+    main(sys.argv[1:])


=== Zope3/src/zodb/zeo/tests/stress.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/stress.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,139 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""A ZEO client-server stress test to look for leaks.
+
+The stress test should run in an infinite loop and should involve
+multiple connections.
+"""
+# XXX This code is currently broken.
+
+from __future__ import nested_scopes
+
+import zodb.db
+from zodb.zeo.client import ClientStorage
+from zodb.storage.mapping import MappingStorage
+from zodb.zeo.tests import forker
+from zodb.storage.tests import MinPO
+
+import os
+import random
+import sys
+import types
+
+NUM_TRANSACTIONS_PER_CONN = 10
+NUM_CONNECTIONS = 10
+NUM_ROOTS = 20
+MAX_DEPTH = 20
+MIN_OBJSIZE = 128
+MAX_OBJSIZE = 2048
+
+def an_object():
+    """Return an object suitable for a PersistentMapping key"""
+    size = random.randrange(MIN_OBJSIZE, MAX_OBJSIZE)
+    if os.path.exists("/dev/urandom"):
+        f = open("/dev/urandom")
+        buf = f.read(size)
+        f.close()
+        return buf
+    else:
+        f = open(MinPO.__file__)
+        l = list(f.read(size))
+        f.close()
+        random.shuffle(l)
+        return "".join(l)
+
+def setup(cn):
+    """Initialize the database with some objects"""
+    root = cn.root()
+    for i in range(NUM_ROOTS):
+        prev = an_object()
+        for j in range(random.randrange(1, MAX_DEPTH)):
+            o = MinPO.MinPO(prev)
+            prev = o
+        root[an_object()] = o
+        get_transaction().commit()
+    cn.close()
+
+def work(cn):
+    """Do some work with a transaction"""
+    cn.sync()
+    root = cn.root()
+    obj = random.choice(root.values())
+    # walk down to the bottom
+    while not isinstance(obj.value, types.StringType):
+        obj = obj.value
+    obj.value = an_object()
+    get_transaction().commit()
+
+def main():
+    # Yuck!  Need to cleanup forker so that the API is consistent
+    # across Unix and Windows, at least if that's possible.
+    if os.name == "nt":
+        zaddr, tport, pid = forker.start_zeo_server('MappingStorage', ())
+        def exitserver():
+            import socket
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.connect(tport)
+            s.close()
+    else:
+        zaddr = '', random.randrange(20000, 30000)
+        pid, exitobj = forker.start_zeo_server(MappingStorage(), zaddr)
+        def exitserver():
+            exitobj.close()
+
+    while 1:
+        pid = start_child(zaddr)
+        print "started", pid
+        os.waitpid(pid, 0)
+
+    exitserver()
+
+def start_child(zaddr):
+
+    pid = os.fork()
+    if pid != 0:
+        return pid
+    try:
+        _start_child(zaddr)
+    finally:
+        os._exit(0)
+
+def _start_child(zaddr):
+    storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5, wait=1)
+    db = zodb.db(storage, pool_size=NUM_CONNECTIONS)
+    setup(db.open())
+    conns = []
+    conn_count = 0
+
+    for i in range(NUM_CONNECTIONS):
+        c = db.open()
+        c.__count = 0
+        conns.append(c)
+        conn_count += 1
+
+    while conn_count < 25:
+        c = random.choice(conns)
+        if c.__count > NUM_TRANSACTIONS_PER_CONN:
+            conns.remove(c)
+            c.close()
+            conn_count += 1
+            c = db.open()
+            c.__count = 0
+            conns.append(c)
+        else:
+            c.__count += 1
+        work(c)
+
+if __name__ == "__main__":
+    main()


=== Zope3/src/zodb/zeo/tests/test_cache.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:54 2002
+++ Zope3/src/zodb/zeo/tests/test_cache.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,358 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Test suite for the ZEO.ClientCache module.
+
+At times, we do 'white box' testing, i.e. we know about the internals
+of the ClientCache object.
+"""
+from __future__ import nested_scopes
+
+import os
+import time
+import tempfile
+import unittest
+
+from zodb.zeo.cache import ClientCache
+
+class ClientCacheTests(unittest.TestCase):
+
+    def setUp(self):
+        unittest.TestCase.setUp(self)
+        self.cachesize = 10*1000*1000
+        self.cache = ClientCache(size=self.cachesize)
+        self.cache.open()
+
+    def tearDown(self):
+        self.cache.close()
+        unittest.TestCase.tearDown(self)
+
+    def testOpenClose(self):
+        pass # All the work is done by setUp() / tearDown()
+
+    def testStoreLoad(self):
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+
+    def testMissingLoad(self):
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        loaded = cache.load('garbage1', '')
+        self.assertEqual(loaded, None)
+
+    def testInvalidate(self):
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+        cache.invalidate(oid, '')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, None)
+
+    def testVersion(self):
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        vname = 'myversion'
+        vdata = '5678'*200
+        vserial = 'IJKLMNOP'
+        cache.store(oid, data, serial, vname, vdata, vserial)
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+        vloaded = cache.load(oid, vname)
+        self.assertEqual(vloaded, (vdata, vserial))
+
+    def testVersionOnly(self):
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = ''
+        serial = ''
+        vname = 'myversion'
+        vdata = '5678'*200
+        vserial = 'IJKLMNOP'
+        cache.store(oid, data, serial, vname, vdata, vserial)
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, None)
+        vloaded = cache.load(oid, vname)
+        self.assertEqual(vloaded, (vdata, vserial))
+
+    def testInvalidateNonVersion(self):
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        vname = 'myversion'
+        vdata = '5678'*200
+        vserial = 'IJKLMNOP'
+        cache.store(oid, data, serial, vname, vdata, vserial)
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+        vloaded = cache.load(oid, vname)
+        self.assertEqual(vloaded, (vdata, vserial))
+        cache.invalidate(oid, '')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, None)
+        # The version data is also invalidated at this point
+        vloaded = cache.load(oid, vname)
+        self.assertEqual(vloaded, None)
+
+    def testInvalidateVersion(self):
+        # Invalidating a version should not invalidate the non-version data.
+        # (This tests for the same bug as testInvalidatePersists below.)
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+        cache.invalidate(oid, 'bogus')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+
+    def testVerify(self):
+        cache = self.cache
+        results = []
+        def verifier(oid, serial, vserial):
+            results.append((oid, serial, vserial))
+        cache.verify(verifier)
+        self.assertEqual(results, [])
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        results = []
+        cache.verify(verifier)
+        self.assertEqual(results, [(oid, serial, None)])
+
+    def testCheckSize(self):
+        # Make sure that cache._index[oid] is erased for oids that are
+        # stored in the cache file that's rewritten after a flip.
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '1234'*100
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        cache.checkSize(10*self.cachesize) # Force a file flip
+        oid2 = 'abcdefgz'
+        data2 = '1234'*10
+        serial2 = 'ABCDEFGZ'
+        cache.store(oid2, data2, serial2, '', '', '')
+        cache.checkSize(10*self.cachesize) # Force another file flip
+        self.assertNotEqual(cache._index.get(oid2), None)
+        self.assertEqual(cache._index.get(oid), None)
+
+    def testCopyToCurrent(self):
+        # - write some objects to cache file 0
+        # - force a flip
+        # - write some objects to cache file 1
+        # - load some objects that are in cache file 0
+        # - load the same objects, making sure they are now in file 1
+        # - write some more objects
+        # - force another flip
+        # - load the same objects again
+        # - make sure they are now in file 0 again
+
+        cache = self.cache
+
+        # Create some objects
+        oid1 = 'abcdefgh'
+        data1 = '1234' * 100
+        serial1 = 'ABCDEFGH'
+        oid2 = 'bcdefghi'
+        data2 = '2345' * 200
+        serial2 = 'BCDEFGHI'
+        version2 = 'myversion'
+        nonversion = 'nada'
+        vdata2 = '5432' * 250
+        vserial2 = 'IHGFEDCB'
+        oid3 = 'cdefghij'
+        data3 = '3456' * 300
+        serial3 = 'CDEFGHIJ'
+
+        # Store them in the cache
+        cache.store(oid1, data1, serial1, '', '', '')
+        cache.store(oid2, data2, serial2, version2, vdata2, vserial2)
+        cache.store(oid3, data3, serial3, '', '', '')
+
+        # Verify that they are in file 0
+        self.assert_(None is not cache._index.get(oid1) > 0)
+        self.assert_(None is not cache._index.get(oid2) > 0)
+        self.assert_(None is not cache._index.get(oid3) > 0)
+
+        # Load them and verify that the loads return correct data
+        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+        self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
+        self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
+        self.assertEqual(cache.load(oid3, ''), (data3, serial3))
+
+        # Verify that they are still in file 0
+        self.assert_(None is not cache._index.get(oid1) > 0)
+        self.assert_(None is not cache._index.get(oid2) > 0)
+        self.assert_(None is not cache._index.get(oid3) > 0)
+
+        # Cause a cache flip
+        cache.checkSize(10*self.cachesize)
+
+        # Load o1, o2, o4 again and verify that the loads return correct data
+        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+        self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
+        self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
+        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+
+        # Verify that o1, o2, 04 are now in file 1, o3 still in file 0
+        self.assert_(None is not cache._index.get(oid1) < 0)
+        self.assert_(None is not cache._index.get(oid2) < 0)
+        self.assert_(None is not cache._index.get(oid3) > 0)
+
+        # Cause another cache flip
+        cache.checkSize(10*self.cachesize)
+
+        # Load o1 and o2 again and verify that the loads return correct data
+        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+        self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
+        self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
+        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+
+        # Verify that o1 and o2 are now back in file 0, o3 is lost
+        self.assert_(None is not cache._index.get(oid1) > 0)
+        self.assert_(None is not cache._index.get(oid2) > 0)
+        self.assert_(None is cache._index.get(oid3))
+
+        # Invalidate version data for o2
+        cache.invalidate(oid2, nonversion)
+        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+        self.assertEqual(cache.load(oid2, nonversion), None)
+        self.assertEqual(cache.load(oid2, version2), None)
+
+        # Cause another cache flip
+        cache.checkSize(10*self.cachesize)
+
+        # Load o1 and o2 again and verify that the loads return correct data
+        self.assertEqual(cache.load(oid1, ''), (data1, serial1))
+        self.assertEqual(cache.load(oid2, version2), None)
+        self.assertEqual(cache.load(oid2, nonversion), None)
+        self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+
+        # Verify that o1 and o2 are now in file 1
+        self.assert_(None is not cache._index.get(oid1) < 0)
+        self.assert_(None is not cache._index.get(oid2) < 0)
+
+class PersistentClientCacheTests(unittest.TestCase):
+
+    def setUp(self):
+        unittest.TestCase.setUp(self)
+        self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
+        self.cachesize = 10*1000*1000
+        self.storagename = 'foo'
+        self.clientname = 'test'
+        # Predict file names
+        fn0 = 'c%s-%s-0.zec' % (self.storagename, self.clientname)
+        fn1 = 'c%s-%s-1.zec' % (self.storagename, self.clientname)
+        for fn in fn0, fn1:
+            fn = os.path.join(self.vardir, fn)
+            try:
+                os.unlink(fn)
+            except os.error:
+                pass
+        self.openCache()
+
+    def openCache(self):
+        self.cache = ClientCache(storage=self.storagename,
+                                 size=self.cachesize,
+                                 client=self.clientname,
+                                 var=self.vardir)
+        self.cache.open()
+
+    def reopenCache(self):
+        self.cache.close()
+        self.openCache()
+        return self.cache
+
+    def tearDown(self):
+        self.cache.close()
+        for filename in self.cache._p:
+            if filename is not None:
+                try:
+                    os.unlink(filename)
+                except os.error:
+                    pass
+        unittest.TestCase.tearDown(self)
+
+    def testCacheFileSelection(self):
+        # A bug in __init__ read the wrong slice of the file to determine
+        # the serial number of the first record, reading the
+        # last byte of the data size plus the first seven bytes of the
+        # serial number.  This caused random selection of the proper
+        # 'current' file when a persistent cache was opened.
+        cache = self.cache
+        self.assertEqual(cache._current, 0) # Check that file 0 is current
+        oid = 'abcdefgh'
+        data = '1234'
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        cache.checkSize(10*self.cachesize) # Force a file flip
+        self.assertEqual(cache._current, 1) # Check that the flip worked
+        oid = 'abcdefgh'
+        data = '123'
+        serial = 'ABCDEFGZ'
+        cache.store(oid, data, serial, '', '', '')
+        cache = self.reopenCache()
+        loaded = cache.load(oid, '')
+        # Check that we got the most recent data:
+        self.assertEqual(loaded, (data, serial))
+        self.assertEqual(cache._current, 1) # Double check that 1 is current
+
+    def testInvalidationPersists(self):
+        # A bug in invalidate() caused invalidation to overwrite the
+        # 2nd byte of the data size on disk, rather rather than
+        # overwriting the status byte.  For certain data sizes this
+        # can be observed by reopening a persistent cache: the
+        # invalidated data will appear valid (but with altered size).
+        cache = self.cache
+        magicsize = (ord('i') + 1) << 16
+        cache = self.cache
+        oid = 'abcdefgh'
+        data = '!'*magicsize
+        serial = 'ABCDEFGH'
+        cache.store(oid, data, serial, '', '', '')
+        loaded = cache.load(oid, '')
+        self.assertEqual(loaded, (data, serial))
+        cache.invalidate(oid, '')
+        cache = self.reopenCache()
+        loaded = cache.load(oid, '')
+        if loaded != None:
+            self.fail("invalidated data resurrected, size %d, was %d" %
+                      (len(loaded[0]), len(data)))
+
+def test_suite():
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(ClientCacheTests))
+    suite.addTest(unittest.makeSuite(PersistentClientCacheTests))
+    return suite
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')


=== Zope3/src/zodb/zeo/tests/test_conn.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/test_conn.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,102 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Test setup for ZEO connection logic.
+
+The actual tests are in ConnectionTests.py; this file provides the
+platform-dependent scaffolding.
+"""
+
+# System imports
+import unittest
+# Import the actual test class
+from zodb.zeo.tests import connection
+
+
+class FileStorageConfig:
+    def getConfig(self, path, create, read_only):
+        return """\
+        <Storage>
+            type FileStorage
+            file_name %s
+            create %s
+            read_only %s
+        </Storage>""" % (path,
+                         create and 'yes' or 'no',
+                         read_only and 'yes' or 'no')
+
+
+class BerkeleyStorageConfig:
+    def getConfig(self, path, create, read_only):
+        # Full always creates and doesn't have a read_only flag
+        return """\
+        <Storage>
+            type BDBFullStorage
+            name %s
+            read_only %s
+        </Storage>""" % (path,
+                         read_only and 'yes' or 'no')
+
+
+class FileStorageConnectionTests(
+    FileStorageConfig,
+    connection.ConnectionTests
+    ):
+    """FileStorage-specific connection tests."""
+
+
+class FileStorageReconnectionTests(
+    FileStorageConfig,
+    connection.ReconnectionTests
+    ):
+    """FileStorage-specific re-connection tests."""
+
+
+class BDBConnectionTests(
+    BerkeleyStorageConfig,
+    connection.ConnectionTests
+    ):
+    """Berkeley storage connection tests."""
+
+
+class BDBReconnectionTests(
+    BerkeleyStorageConfig,
+    connection.ReconnectionTests
+    ):
+    """Berkeley storage re-connection tests."""
+
+
+test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
+try:
+    from zodb.storage.bdbfull import BDBFullStorage
+except ImportError:
+    pass
+else:
+    test_classes.append(BDBConnectionTests)
+    test_classes.append(BDBReconnectionTests)
+
+
+def test_suite():
+    # shutup warnings about mktemp
+    import warnings
+    warnings.filterwarnings("ignore", "mktemp")
+
+    suite = unittest.TestSuite()
+    for klass in test_classes:
+        sub = unittest.makeSuite(klass, 'check')
+        suite.addTest(sub)
+    return suite
+
+
+if __name__ == "__main__":
+    unittest.main(defaultTest='test_suite')


=== Zope3/src/zodb/zeo/tests/test_tbuf.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/test_tbuf.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,76 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+import random
+import unittest
+
+from zodb.zeo.tbuf import TransactionBuffer
+
+def random_string(size):
+    """Return a random string of size size."""
+    l = [chr(random.randrange(256)) for i in range(size)]
+    return "".join(l)
+
+def new_store_data():
+    """Return arbitrary data to use as argument to store() method."""
+    return random_string(8), '', random_string(random.randrange(1000))
+
+def new_invalidate_data():
+    """Return arbitrary data to use as argument to invalidate() method."""
+    return random_string(8), ''
+
+class TransBufTests(unittest.TestCase):
+
+    def checkTypicalUsage(self):
+        tbuf = TransactionBuffer()
+        tbuf.store(*new_store_data())
+        tbuf.invalidate(*new_invalidate_data())
+        tbuf.begin_iterate()
+        while 1:
+            o = tbuf.next()
+            if o is None:
+                break
+        tbuf.clear()
+
+    def doUpdates(self, tbuf):
+        data = []
+        for i in range(10):
+            d = new_store_data()
+            tbuf.store(*d)
+            data.append(d)
+            d = new_invalidate_data()
+            tbuf.invalidate(*d)
+            data.append(d)
+
+        tbuf.begin_iterate()
+        for i in range(len(data)):
+            x = tbuf.next()
+            if x[2] is None:
+                # the tbuf add a dummy None to invalidates
+                x = x[:2]
+            self.assertEqual(x, data[i])
+
+    def checkOrderPreserved(self):
+        tbuf = TransactionBuffer()
+        self.doUpdates(tbuf)
+
+    def checkReusable(self):
+        tbuf = TransactionBuffer()
+        self.doUpdates(tbuf)
+        tbuf.clear()
+        self.doUpdates(tbuf)
+        tbuf.clear()
+        self.doUpdates(tbuf)
+
+def test_suite():
+    return unittest.makeSuite(TransBufTests, 'check')


=== Zope3/src/zodb/zeo/tests/test_zeo.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/test_zeo.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,192 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Test suite for ZEO based on zodb.tests."""
+
+# System imports
+import os
+import sys
+import time
+import socket
+import asyncore
+import tempfile
+import unittest
+import logging
+
+# ZODB test support
+import zodb
+from zodb.storage.tests.minpo import MinPO
+from zodb.storage.tests.base import zodb_unpickle
+
+
+# ZODB test mixin classes
+from zodb.storage.tests import base, basic, version, \
+     undo, undoversion, \
+     packable, synchronization, conflict, revision, \
+     mt, readonly
+
+# ZEO imports
+from zodb.zeo.client import ClientStorage
+from zodb.zeo.interfaces import Disconnected
+
+# ZEO test support
+from zodb.zeo.tests import forker, cache
+
+# ZEO test mixin classes
+from zodb.zeo.tests import commitlock, threadtests
+
+class DummyDB:
+    def invalidate(self, *args):
+        pass
+
+
+class MiscZEOTests:
+    """ZEO tests that don't fit in elsewhere."""
+
+    def checkLargeUpdate(self):
+        obj = MinPO("X" * (10 * 128 * 1024))
+        self._dostore(data=obj)
+
+    def checkZEOInvalidation(self):
+        addr = self._storage._addr
+        storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
+        try:
+            oid = self._storage.new_oid()
+            ob = MinPO('first')
+            revid1 = self._dostore(oid, data=ob)
+            data, serial = storage2.load(oid, '')
+            self.assertEqual(zodb_unpickle(data), MinPO('first'))
+            self.assertEqual(serial, revid1)
+            revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
+            for n in range(3):
+                # Let the server and client talk for a moment.
+                # Is there a better way to do this?
+                asyncore.poll(0.1)
+            data, serial = storage2.load(oid, '')
+            self.assertEqual(zodb_unpickle(data), MinPO('second'),
+                             'Invalidation message was not sent!')
+            self.assertEqual(serial, revid2)
+        finally:
+            storage2.close()
+
+
+class GenericTests(
+    # Base class for all ZODB tests
+    base.StorageTestBase,
+    # ZODB test mixin classes (in the same order as imported)
+    basic.BasicStorage,
+    version.VersionStorage,
+    undo.TransactionalUndoStorage,
+    undoversion.TransactionalUndoVersionStorage,
+    packable.PackableStorage,
+    synchronization.SynchronizedStorage,
+    conflict.ConflictResolvingStorage,
+    conflict.ConflictResolvingTransUndoStorage,
+    revision.RevisionStorage,
+    mt.MTStorage,
+    readonly.ReadOnlyStorage,
+    # ZEO test mixin classes (in the same order as imported)
+    cache.StorageWithCache,
+    cache.TransUndoStorageWithCache,
+    commitlock.CommitLockTests,
+    threadtests.ThreadTests,
+    # Locally defined (see above)
+    MiscZEOTests
+    ):
+
+    """Combine tests from various origins in one class."""
+
+    def open(self, read_only=0):
+        # XXX Needed to support ReadOnlyStorage tests.  Ought to be a
+        # cleaner way.
+        addr = self._storage._addr
+        self._storage.close()
+        self._storage = ClientStorage(addr, read_only=read_only, wait=1)
+
+    def unresolvable(self, klass):
+        # This helper method is used to test the implementation of
+        # conflict resolution.  That code runs in the server, and there
+        # is no way for the test suite (a client) to inquire about it.
+        pass
+
+
+class FileStorageTests(GenericTests):
+    """Test ZEO backed by a FileStorage."""
+
+    def setUp(self):
+        logging.info("testZEO: setUp() %s", self.id())
+        zeoport, adminaddr, pid = forker.start_zeo_server(self.getConfig())
+        self._pids = [pid]
+        self._servers = [adminaddr]
+        self._storage = ClientStorage(zeoport, '1', cache_size=20000000,
+                                      min_disconnect_poll=0.5, wait=1)
+        self._storage.registerDB(DummyDB())
+
+    def tearDown(self):
+        self._storage.close()
+        for server in self._servers:
+            forker.shutdown_zeo_server(server)
+        if hasattr(os, 'waitpid'):
+            # Not in Windows Python until 2.3
+            for pid in self._pids:
+                os.waitpid(pid, 0)
+
+    def getConfig(self):
+        filename = self.__fs_base = tempfile.mktemp()
+        # Return a 1-tuple
+        return """\
+        <Storage>
+            type FileStorage
+            file_name %s
+            create yes
+        </Storage>
+        """ % filename
+
+
+class BDBTests(FileStorageTests):
+    """ZEO backed by a Berkeley Full storage."""
+
+    def getStorage(self):
+        self._envdir = tempfile.mktemp()
+        # Return a 1-tuple
+        return """\
+        <Storage>
+            type BDBFullStorage
+            name %s
+        </Storage>
+        """ % self._envdir
+
+
+test_classes = [FileStorageTests]
+try:
+    from zodb.storage.bdbfull import BDBFullStorage
+except ImportError:
+    pass
+else:
+    test_classes.append(BDBTests)
+
+
+def test_suite():
+    # shutup warnings about mktemp
+    import warnings
+    warnings.filterwarnings("ignore", "mktemp")
+
+    suite = unittest.TestSuite()
+    for klass in test_classes:
+        sub = unittest.makeSuite(klass, 'check')
+        suite.addTest(sub)
+    return suite
+
+
+if __name__ == "__main__":
+    unittest.main(defaultTest='test_suite')


=== Zope3/src/zodb/zeo/tests/threadtests.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/threadtests.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,169 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Compromising positions involving threads."""
+
+import threading
+
+from zodb.ztransaction import Transaction
+from zodb.storage.tests.base import zodb_pickle, MinPO
+
+from zodb.zeo.client import ClientStorageError
+from zodb.zeo.interfaces import Disconnected
+
+ZERO = '\0'*8
+
+class BasicThread(threading.Thread):
+    def __init__(self, storage, doNextEvent, threadStartedEvent):
+        self.storage = storage
+        self.trans = Transaction()
+        self.doNextEvent = doNextEvent
+        self.threadStartedEvent = threadStartedEvent
+        self.gotValueError = 0
+        self.gotDisconnected = 0
+        threading.Thread.__init__(self)
+        self.setDaemon(1)
+
+    def join(self):
+        threading.Thread.join(self, 10)
+        assert not self.isAlive()
+
+
+class GetsThroughVoteThread(BasicThread):
+    # This thread gets partially through a transaction before it turns
+    # execution over to another thread.  We're trying to establish that a
+    # tpc_finish() after a storage has been closed by another thread will get
+    # a ClientStorageError error.
+    #
+    # This class gets does a tpc_begin(), store(), tpc_vote() and is waiting
+    # to do the tpc_finish() when the other thread closes the storage.
+    def run(self):
+        self.storage.tpc_begin(self.trans)
+        oid = self.storage.new_oid()
+        self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
+        self.storage.tpc_vote(self.trans)
+        self.threadStartedEvent.set()
+        self.doNextEvent.wait(10)
+        try:
+            self.storage.tpc_finish(self.trans)
+        except ClientStorageError:
+            self.gotValueError = 1
+            self.storage.tpc_abort(self.trans)
+
+
+class GetsThroughBeginThread(BasicThread):
+    # This class is like the above except that it is intended to be run when
+    # another thread is already in a tpc_begin().  Thus, this thread will
+    # block in the tpc_begin until another thread closes the storage.  When
+    # that happens, this one will get disconnected too.
+    def run(self):
+        try:
+            self.storage.tpc_begin(self.trans)
+        except ClientStorageError:
+            self.gotValueError = 1
+
+
+class AbortsAfterBeginFailsThread(BasicThread):
+    # This class is identical to GetsThroughBeginThread except that it
+    # attempts to tpc_abort() after the tpc_begin() fails.  That will raise a
+    # ClientDisconnected exception which implies that we don't have the lock,
+    # and that's what we really want to test (but it's difficult given the
+    # threading module's API).
+    def run(self):
+        try:
+            self.storage.tpc_begin(self.trans)
+        except ClientStorageError:
+            self.gotValueError = 1
+        try:
+            self.storage.tpc_abort(self.trans)
+        except Disconnected:
+            self.gotDisconnected = 1
+
+
+class ThreadTests:
+    # Thread 1 should start a transaction, but not get all the way through it.
+    # Main thread should close the connection.  Thread 1 should then get
+    # disconnected.
+    def checkDisconnectedOnThread2Close(self):
+        doNextEvent = threading.Event()
+        threadStartedEvent = threading.Event()
+        thread1 = GetsThroughVoteThread(self._storage,
+                                        doNextEvent, threadStartedEvent)
+        thread1.start()
+        threadStartedEvent.wait(10)
+        self._storage.close()
+        doNextEvent.set()
+        thread1.join()
+        self.assertEqual(thread1.gotValueError, 1)
+
+    # Thread 1 should start a transaction, but not get all the way through
+    # it.  While thread 1 is in the middle of the transaction, a second thread
+    # should start a transaction, and it will block in the tcp_begin() --
+    # because thread 1 has acquired the lock in its tpc_begin().  Now the main
+    # thread closes the storage and both sub-threads should get disconnected.
+    def checkSecondBeginFails(self):
+        doNextEvent = threading.Event()
+        threadStartedEvent = threading.Event()
+        thread1 = GetsThroughVoteThread(self._storage,
+                                        doNextEvent, threadStartedEvent)
+        thread2 = GetsThroughBeginThread(self._storage,
+                                         doNextEvent, threadStartedEvent)
+        thread1.start()
+        threadStartedEvent.wait(1)
+        thread2.start()
+        self._storage.close()
+        doNextEvent.set()
+        thread1.join()
+        thread2.join()
+        self.assertEqual(thread1.gotValueError, 1)
+        self.assertEqual(thread2.gotValueError, 1)
+
+    def checkThatFailedBeginDoesNotHaveLock(self):
+        doNextEvent = threading.Event()
+        threadStartedEvent = threading.Event()
+        thread1 = GetsThroughVoteThread(self._storage,
+                                        doNextEvent, threadStartedEvent)
+        thread2 = AbortsAfterBeginFailsThread(self._storage,
+                                              doNextEvent, threadStartedEvent)
+        thread1.start()
+        threadStartedEvent.wait(1)
+        thread2.start()
+        self._storage.close()
+        doNextEvent.set()
+        thread1.join()
+        thread2.join()
+        self.assertEqual(thread1.gotValueError, 1)
+        self.assertEqual(thread2.gotValueError, 1)
+        self.assertEqual(thread2.gotDisconnected, 1)
+
+    # Run a bunch of threads doing small and large stores in parallel
+    def checkMTStores(self):
+        threads = []
+        for i in range(5):
+            t = threading.Thread(target=self.mtstorehelper)
+            threads.append(t)
+            t.start()
+        for t in threads:
+            t.join(30)
+        for i in threads:
+            self.failUnless(not t.isAlive())
+
+    # Helper for checkMTStores
+    def mtstorehelper(self):
+        name = threading.currentThread().getName()
+        objs = []
+        for i in range(10):
+            objs.append(MinPO("X" * 200000))
+            objs.append(MinPO("X"))
+        for obj in objs:
+            self._dostore(data=obj)


=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:13:55 2002
+++ Zope3/src/zodb/zeo/tests/zeoserver.py	Wed Dec 25 09:12:22 2002
@@ -0,0 +1,161 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Helper file used to launch a ZEO server cross platform"""
+
+import os
+import sys
+import errno
+import getopt
+import random
+import socket
+import logging
+import asyncore
+
+import ZConfig
+from zodb import config
+import zodb.zeo.server
+from zodb.zeo import threadedasync
+
+
+def load_storage(fp):
+    rootconf = ZConfig.loadfile(fp)
+    storageconf = rootconf.getSection('Storage')
+    return config.createStorage(storageconf)
+
+
+def cleanup(storage):
+    # FileStorage and the Berkeley storages have this method, which deletes
+    # all files and directories used by the storage.  This prevents @-files
+    # from clogging up /tmp
+    try:
+        storage.cleanup()
+    except AttributeError:
+        pass
+
+
+class ZEOTestServer(asyncore.dispatcher):
+    """A server for killing the whole process at the end of a test.
+
+    The first time we connect to this server, we write an ack character down
+    the socket.  The other end should block on a recv() of the socket so it
+    can guarantee the server has started up before continuing on.
+
+    The second connect to the port immediately exits the process, via
+    os._exit(), without writing data on the socket.  It does close and clean
+    up the storage first.  The other end will get the empty string from its
+    recv() which will be enough to tell it that the server has exited.
+
+    I think this should prevent us from ever getting a legitimate addr-in-use
+    error.
+    """
+    __super_init = asyncore.dispatcher.__init__
+
+    def __init__(self, addr, storage, keep):
+        self.__super_init()
+        self._storage = storage
+        self._keep = keep
+        # Count down to zero, the number of connects
+        self._count = 1
+        # Create a logger
+        self.logger = logging.getLogger('zeoserver.%d.%s' %
+                                        (os.getpid(), addr))
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        # Some ZEO tests attempt a quick start of the server using the same
+        # port so we have to set the reuse flag.
+        self.set_reuse_addr()
+        try:
+            self.bind(addr)
+        except:
+            # We really want to see these exceptions
+            import traceback
+            traceback.print_exc()
+            raise
+        self.listen(5)
+        self.logger.info('bound and listening')
+
+    def handle_accept(self):
+        sock, addr = self.accept()
+        self.logger.info('in handle_accept()')
+        # When we're done with everything, close the storage.  Do not write
+        # the ack character until the storage is finished closing.
+        if self._count <= 0:
+            self.logger.info('closing the storage')
+            self._storage.close()
+            if not self._keep:
+                cleanup(self._storage)
+            self.logger.info('exiting')
+            os._exit(0)
+        self.logger.info('continuing')
+        sock.send('X')
+        self._count -= 1
+
+
+def main():
+
+    # Initialize the logging module.
+    import logging.config
+    logging.basicConfig()
+    level = os.getenv("LOGGING")
+    if level:
+        level = int(level)
+    else:
+        level = logging.CRITICAL
+    logging.root.setLevel(level)
+
+    # Create a logger
+    logger = logging.getLogger('zeoserver.%d' % os.getpid())
+    logger.info('starting')
+
+    # We don't do much sanity checking of the arguments, since if we get it
+    # wrong, it's a bug in the test suite.
+    ro_svr = False
+    keep = False
+    configfile = None
+    # Parse the arguments and let getopt.error percolate
+    opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
+    for opt, arg in opts:
+        if opt == '-r':
+            ro_svr = True
+        elif opt == '-k':
+            keep = True
+        elif opt == '-C':
+            configfile = arg
+    # Open the config file and let ZConfig parse the data there.  Then remove
+    # the config file, otherwise we'll leave turds.
+    fp = open(configfile, 'r')
+    storage = load_storage(fp)
+    fp.close()
+    os.remove(configfile)
+    # The rest of the args are hostname, portnum
+    zeo_port = int(args[0])
+    test_port = zeo_port + 1
+    try:
+        logger.info('creating the test server, ro: %s, keep: %s',
+                    ro_svr, keep)
+        t = ZEOTestServer(('', test_port), storage, keep)
+    except socket.error, e:
+        if e[0] <> errno.EADDRINUSE: raise
+        logger.info('addr in use, closing and exiting')
+        storage.close()
+        cleanup(storage)
+        sys.exit(2)
+    addr = ('', zeo_port)
+    logger.info('creating the storage server')
+    serv = zodb.zeo.server.StorageServer(addr, {'1': storage}, ro_svr)
+    logger.info('entering threadedasync loop')
+    threadedasync.loop()
+
+
+if __name__ == '__main__':
+    main()