[Zodb-checkins] CVS: ZEO/ZEO - ClientStub.py:1.4 CommitLog.py:1.2 Exceptions.py:1.4 ICache.py:1.2 ServerStub.py:1.4 TransactionBuffer.py:1.4 ClientCache.py:1.23 ClientStorage.py:1.41 StorageServer.py:1.37 __init__.py:1.9 smac.py:1.17 start.py:1.33 trigger.py:1.6

Jeremy Hylton jeremy@zope.com
Tue, 11 Jun 2002 09:43:37 -0400


Update of /cvs-repository/ZEO/ZEO
In directory cvs.zope.org:/tmp/cvs-serv5548/ZEO

Modified Files:
	ClientCache.py ClientStorage.py StorageServer.py __init__.py 
	smac.py start.py trigger.py 
Added Files:
	ClientStub.py CommitLog.py Exceptions.py ICache.py 
	ServerStub.py TransactionBuffer.py 
Log Message:
Merge ZEO2-branch to trunk.


=== ZEO/ZEO/ClientStub.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+"""Stub for interface exported by ClientStorage"""
+
+class ClientStorage:
+    def __init__(self, rpc):
+        self.rpc = rpc
+
+    def beginVerify(self):
+        self.rpc.callAsync('begin')
+
+    # XXX must rename the two invalidate messages.  I can never
+    # remember which is which
+
+    def invalidate(self, args):
+        self.rpc.callAsync('invalidate', args)
+
+    def Invalidate(self, args):
+        self.rpc.callAsync('Invalidate', args)
+
+    def endVerify(self):
+        self.rpc.callAsync('end')
+
+    def serialnos(self, arg):
+        self.rpc.callAsync('serialnos', arg)
+
+    def info(self, arg):
+        self.rpc.callAsync('info', arg)


=== ZEO/ZEO/CommitLog.py 1.1 => 1.2 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+"""Log a transaction's commit info during two-phase commit.
+
+A storage server allows multiple clients to commit transactions, but
+must serialize them as the actually execute at the server.  The
+concurrent commits are achieved by logging actions up until the
+tpc_vote().  At that point, the entire transaction is committed on the
+real storage.
+"""
+import cPickle
+import tempfile
+
+class CommitLog:
+
+    def __init__(self):
+        self.file = tempfile.TemporaryFile(suffix=".log")
+        self.pickler = cPickle.Pickler(self.file, 1)
+        self.pickler.fast = 1
+        self.stores = 0
+        self.read = 0
+
+    def tpc_begin(self, t, tid, status):
+        self.t = t
+        self.tid = tid
+        self.status = status
+
+    def store(self, oid, serial, data, version):
+        self.pickler.dump((oid, serial, data, version))
+        self.stores += 1
+
+    def get_loader(self):
+        self.read = 1
+        self.file.seek(0)
+        return self.stores, cPickle.Unpickler(self.file)
+


=== ZEO/ZEO/Exceptions.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+"""Exceptions for ZEO."""
+
+class Disconnected(Exception):
+    """Exception raised when a ZEO client is disconnected from the
+    ZEO server."""


=== ZEO/ZEO/ICache.py 1.1 => 1.2 ===
+    from Interface import Base
+except ImportError:
+    class Base:
+        # a dummy interface for use when Zope's is unavailable
+        pass
+
+class ICache(Base):
+    """ZEO client cache.
+
+    __init__(storage, size, client, var)
+
+    All arguments optional.
+
+    storage -- name of storage
+    size -- max size of cache in bytes
+    client -- a string; if specified, cache is persistent.
+    var -- var directory to store cache files in
+    """
+    
+    def open():
+        """Returns a sequence of object info tuples.
+
+        An object info tuple is a pair containing an object id and a
+        pair of serialnos, a non-version serialno and a version serialno:
+        oid, (serial, ver_serial)
+
+        This method builds an index of the cache and returns a
+        sequence used for cache validation.
+        """
+
+    def close():
+        """Closes the cache."""
+
+    def verify(func):
+        """Call func on every object in cache.
+
+        func is called with three arguments
+        func(oid, serial, ver_serial)
+        """
+
+    def invalidate(oid, version):
+        """Remove object from cache."""
+
+    def load(oid, version):
+        """Load object from cache.
+
+        Return None if object not in cache.
+        Return data, serialno if object is in cache.
+        """
+
+    def store(oid, p, s, version, pv, sv):
+        """Store a new object in the cache."""
+
+    def update(oid, serial, version, data):
+        """Update an object already in the cache.
+
+        XXX This method is called to update objects that were modified by
+        a transaction.  It's likely that it is already in the cache,
+        and it may be possible for the implementation to operate more
+        efficiently.
+        """
+
+    def modifiedInVersion(oid):
+        """Return the version an object is modified in.
+
+        '' signifies the trunk.
+        Returns None if the object is not in the cache.
+        """
+
+    def checkSize(size):
+        """Check if adding size bytes would exceed cache limit.
+
+        This method is often called just before store or update.  The
+        size is a hint about the amount of data that is about to be
+        stored.  The cache may want to evict some data to make space.
+        """
+
+    
+    
+        
+
+    


=== ZEO/ZEO/ServerStub.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+"""Stub for interface exposed by StorageServer"""
+
+class StorageServer:
+
+    def __init__(self, rpc):
+        self.rpc = rpc
+
+    def register(self, storage_name, read_only):
+        self.rpc.call('register', storage_name, read_only)
+
+    def get_info(self):
+        return self.rpc.call('get_info')
+
+    def get_size_info(self):
+        return self.rpc.call('get_size_info')
+
+    def beginZeoVerify(self):
+        self.rpc.callAsync('beginZeoVerify')
+
+    def zeoVerify(self, oid, s, sv):
+        self.rpc.callAsync('zeoVerify', oid, s, sv)
+
+    def endZeoVerify(self):
+        self.rpc.callAsync('endZeoVerify')
+
+    def new_oids(self, n=None):
+        if n is None:
+            return self.rpc.call('new_oids')
+        else:
+            return self.rpc.call('new_oids', n)
+
+    def pack(self, t, wait=None):
+        if wait is None:
+            self.rpc.call('pack', t)
+        else:
+            self.rpc.call('pack', t, wait)
+
+    def zeoLoad(self, oid):
+        return self.rpc.call('zeoLoad', oid)
+
+    def storea(self, oid, serial, data, version, id):
+        self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+    def tpc_begin(self, id, user, descr, ext, tid, status):
+        return self.rpc.call('tpc_begin', id, user, descr, ext, tid, status)
+
+    def vote(self, trans_id):
+        return self.rpc.call('vote', trans_id)
+
+    def tpc_finish(self, id):
+        return self.rpc.call('tpc_finish', id)
+
+    def tpc_abort(self, id):
+        self.rpc.callAsync('tpc_abort', id)
+
+    def abortVersion(self, src, id):
+        return self.rpc.call('abortVersion', src, id)
+
+    def commitVersion(self, src, dest, id):
+        return self.rpc.call('commitVersion', src, dest, id)
+
+    def history(self, oid, version, length=None):
+        if length is not None:
+            return self.rpc.call('history', oid, version)
+        else:
+            return self.rpc.call('history', oid, version, length)
+
+    def load(self, oid, version):
+        return self.rpc.call('load', oid, version)
+
+    def loadSerial(self, oid, serial):
+        return self.rpc.call('loadSerial', oid, serial)
+
+    def modifiedInVersion(self, oid):
+        return self.rpc.call('modifiedInVersion', oid)
+
+    def new_oid(self, last=None):
+        if last is None:
+            return self.rpc.call('new_oid')
+        else:
+            return self.rpc.call('new_oid', last)
+
+    def store(self, oid, serial, data, version, trans):
+        return self.rpc.call('store', oid, serial, data, version, trans)
+
+    def transactionalUndo(self, trans_id, trans):
+        return self.rpc.call('transactionalUndo', trans_id, trans)
+
+    def undo(self, trans_id):
+        return self.rpc.call('undo', trans_id)
+
+    def undoLog(self, first, last):
+        # XXX filter not allowed across RPC
+        return self.rpc.call('undoLog', first, last)
+
+    def undoInfo(self, first, last, spec):
+        return self.rpc.call('undoInfo', first, last, spec)
+
+    def versionEmpty(self, vers):
+        return self.rpc.call('versionEmpty', vers)
+
+    def versions(self, max=None):
+        if max is None:
+            return self.rpc.call('versions')
+        else:
+            return self.rpc.call('versions', max)


=== ZEO/ZEO/TransactionBuffer.py 1.3 => 1.4 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
+"""A TransactionBuffer store transaction updates until commit or abort.
+
+A transaction may generate enough data that it is not practical to
+always hold pending updates in memory.  Instead, a TransactionBuffer
+is used to store the data until a commit or abort.
+"""
+
+# A faster implementation might store trans data in memory until it
+# reaches a certain size.
+
+import tempfile
+import cPickle
+
+class TransactionBuffer:
+
+    def __init__(self):
+        self.file = tempfile.TemporaryFile(suffix=".tbuf")
+        self.count = 0
+        self.size = 0
+        # It's safe to use a fast pickler because the only objects
+        # stored are builtin types -- strings or None.
+        self.pickler = cPickle.Pickler(self.file, 1)
+        self.pickler.fast = 1
+
+    def close(self): 
+        try:
+            self.file.close()
+        except OSError:
+            pass
+        
+
+    def store(self, oid, version, data):
+        """Store oid, version, data for later retrieval"""
+        self.pickler.dump((oid, version, data))
+        self.count += 1
+        # Estimate per-record cache size
+        self.size = self.size + len(data) + (27 + 12)
+        if version:
+            self.size = self.size + len(version) + 4
+
+    def invalidate(self, oid, version):
+        self.pickler.dump((oid, version, None))
+        self.count += 1
+
+    def clear(self):
+        """Mark the buffer as empty"""
+        self.file.seek(0)
+        self.count = 0
+        self.size = 0
+
+    # unchecked constraints:
+    # 1. can't call store() after begin_iterate()
+    # 2. must call clear() after iteration finishes
+
+    def begin_iterate(self):
+        """Move the file pointer in advance of iteration"""
+        self.file.flush()
+        self.file.seek(0)
+        self.unpickler = cPickle.Unpickler(self.file)
+
+    def next(self):
+        """Return next tuple of data or None if EOF"""
+        if self.count == 0:
+            del self.unpickler
+            return None
+        oid_ver_data = self.unpickler.load()
+        self.count -= 1
+        return oid_ver_data
+
+    def get_size(self):
+        """Return size of data stored in buffer (just a hint)."""
+
+        return self.size


=== ZEO/ZEO/ClientCache.py 1.22 => 1.23 === (447/547 lines abridged)
 ##############################################################################
 """Implement a client cache
- 
+
 The cache is managed as two files, var/c0.zec and var/c1.zec.
 
 Each cache file is a sequence of records of the form:
@@ -75,143 +75,181 @@
 
 __version__ = "$Revision$"[11:-2]
 
-import os, tempfile
+import os
+import sys
+import tempfile
 from struct import pack, unpack
 from thread import allocate_lock
-import zLOG
 
-magic='ZEC0'
+import zLOG
+from ZEO.ICache import ICache
 
-def LOG(msg, level=zLOG.BLATHER):
+def log(msg, level=zLOG.INFO):
     zLOG.LOG("ZEC", level, msg)
 
+magic='ZEC0'
+
 class ClientCache:
 
+    __implements__ = ICache
+
     def __init__(self, storage='', size=20000000, client=None, var=None):
 
         # Allocate locks:
-        l=allocate_lock()
-        self._acquire=l.acquire
-        self._release=l.release
+        L = allocate_lock()
+        self._acquire = L.acquire
+        self._release = L.release
 
         if client:
             # Create a persistent cache
             if var is None:
-                try: var=CLIENT_HOME
+                try:
+                    var = CLIENT_HOME
                 except:

[-=- -=- -=- 447 lines omitted -=- -=- -=-]

-        else: vs=None
+            vs = read(8)
+            if read(4) != h[9:13]:
+                break
+        else:
+            vs = None
 
         if h[8] in 'vn':
-            if current: index[oid]=-pos
-            else: index[oid]=pos
-            serial[oid]=h[-8:], vs
+            if current:
+                index[oid] = -pos
+            else:
+                index[oid] = pos
+            serial[oid] = h[-8:], vs
         else:
             if serial.has_key(oid):
                 # We have a record for this oid, but it was invalidated!
                 del serial[oid]
                 del index[oid]
-            
-            
-        pos=pos+tlen
+
+
+        pos = pos + tlen
 
     f.seek(pos)
-    try: f.truncate()
-    except: pass
-    
-    return pos
+    try:
+        f.truncate()
+    except:
+        pass
 
-def main(files):
-    for file in files:
-        print file
-        index = {}
-        serial = {}
-        read_index(index, serial, open(file), 0)
-        print index.keys()
-
-if __name__ == "__main__":
-    import sys
-    main(sys.argv[1:])
+    return pos


=== ZEO/ZEO/ClientStorage.py 1.40 => 1.41 === (866/966 lines abridged)
 __version__='$Revision$'[11:-2]
 
-import struct, time, os, socket, string
-import tempfile, thread
-from struct import pack, unpack
-from types import TupleType
+import cPickle
+import os
+import tempfile
+import threading
+import time
+
+from ZEO import ClientCache, ServerStub
+from ZEO.TransactionBuffer import TransactionBuffer
+from ZEO.Exceptions import Disconnected
+from ZEO.zrpc.client import ConnectionManager
 
-import Invalidator, ExtensionClass
-import ThreadedAsync, Sync, zrpc, ClientCache
-
-from ZODB import POSException, BaseStorage
+from ZODB import POSException
 from ZODB.TimeStamp import TimeStamp
+from zLOG import LOG, PROBLEM, INFO, BLATHER
 
-from ZEO.logger import zLogger
-
-log = zLogger("ZEO Client")
+def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
+    LOG(subsys, type, msg)
 
 try:
     from ZODB.ConflictResolution import ResolvedSerial
-except:
-    ResolvedSerial='rs'
+except ImportError:
+    ResolvedSerial = 'rs'
 
 class ClientStorageError(POSException.StorageError):
     """An error occured in the ZEO Client Storage"""
 
 class UnrecognizedResult(ClientStorageError):
-    """A server call returned an unrecognized result
-    """
+    """A server call returned an unrecognized result"""
 
-class ClientDisconnected(ClientStorageError):
-    """The database storage is disconnected from the storage.
-    """
+class ClientDisconnected(ClientStorageError, Disconnected):

[-=- -=- -=- 866 lines omitted -=- -=- -=-]

-    _w.append(t)
-    return t
+        return self._server.versions(max)
+
+    # below are methods invoked by the StorageServer
+
+    def serialnos(self, args):
+        self._serials.extend(args)
+
+    def info(self, dict):
+        self._info.update(dict)
+
+    def begin(self):
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
+    def invalidate(self, args):
+        # Queue an invalidate for the end the transaction
+        if self._pickler is None:
+            return
+        self._pickler.dump(args)
+
+    def end(self):
+        if self._pickler is None:
+            return
+        self._pickler.dump((0,0))
+        self._tfile.seek(0)
+        unpick = cPickle.Unpickler(self._tfile)
+        f = self._tfile
+        self._tfile = None
+
+        while 1:
+            oid, version = unpick.load()
+            if not oid:
+                break
+            self._cache.invalidate(oid, version=version)
+            self._db.invalidate(oid, version=version)
+        f.close()
+
+    def Invalidate(self, args):
+        for oid, version in args:
+            self._cache.invalidate(oid, version=version)
+            try:
+                self._db.invalidate(oid, version=version)
+            except AttributeError, msg:
+                log2(PROBLEM,
+                    "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
+                                                               repr(version),
+                                                               msg))


=== ZEO/ZEO/StorageServer.py 1.36 => 1.37 === (1035/1135 lines abridged)
 # 
 ##############################################################################
+"""Network ZODB storage server
 
-__version__ = "$Revision$"[11:-2]
+This server acts as a front-end for one or more real storages, like
+file storage or Berkeley storage.
 
-import asyncore, socket, string, sys, os
-import cPickle
-from cPickle import Unpickler
-from cStringIO import StringIO
-from thread import start_new_thread
-import time
-from types import StringType
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
+"""
 
-from ZODB import POSException
-from ZODB.POSException import TransactionError, UndoError, VersionCommitError
-from ZODB.Transaction import Transaction
+import asyncore
+import cPickle
+import os
+import sys
+import threading
+
+from ZEO import ClientStub
+from ZEO.CommitLog import CommitLog
+from ZEO.zrpc.server import Dispatcher
+from ZEO.zrpc.connection import ManagedServerConnection, Delay
+
+import zLOG
+from ZODB.POSException import StorageError, StorageTransactionError, \
+     TransactionError, ReadOnlyError
 from ZODB.referencesf import referencesf
-from ZODB.utils import U64
-
-from ZEO import trigger
-from ZEO import asyncwrap
-from ZEO.smac import Disconnected, SizedMessageAsyncConnection
-from ZEO.logger import zLogger, format_msg
-
-class StorageServerError(POSException.StorageError):
-    pass
+from ZODB.Transaction import Transaction
+from ZODB.TmpStore import TmpStore
 
 # We create a special fast pickler! This allows us

[-=- -=- -=- 1035 lines omitted -=- -=- -=-]

+        self.log = CommitLog()
+        self.invalidated = []
+
+        # Store information about the call that blocks
+        self.name = None
+        self.args = None
+
+    def tpc_begin(self, txn, tid, status):
+        self.txn = txn
+        self.tid = tid
+        self.status = status
+
+    def store(self, oid, serial, data, version):
+        self.log.store(oid, serial, data, version)
+
+    def tpc_abort(self):
+        pass # just forget about this strategy
+
+    def tpc_finish(self):
+        raise RuntimeError, "Logic error.  This method must not be called."
+
+    def tpc_vote(self):
+        self.name = "tpc_vote"
+        self.args = ()
+        return self.block()
+
+    def commitVersion(self, src, dest):
+        self.name = "commitVersion"
+        self.args = src, dest
+        return self.block()
+
+    def abortVersion(self, src):
+        self.name = "abortVersion"
+        self.args = src,
+        return self.block()
+
+    def transactionalUndo(self, trans_id):
+        self.name = "transactionalUndo"
+        self.args = trans_id,
+        return self.block()
+
+    def restart(self, new_strategy):
+        # called by the storage when the storage is available
+        new_strategy.tpc_begin(self.txn, self.tid, self.status)
+        loads, loader = self.log.get_loader()
+        for i in range(loads):
+            oid, serial, data, version = loader.load()
+            new_strategy.store(oid, serial, data, version)
+        meth = getattr(new_strategy, self.name)
+        return meth(*self.args)


=== ZEO/ZEO/__init__.py 1.8 => 1.9 ===
 # 
 ##############################################################################
-
-import fap


=== ZEO/ZEO/smac.py 1.16 => 1.17 ===
 __version__ = "$Revision$"[11:-2]
 
-import asyncore, string, struct, zLOG, sys, Acquisition
+import asyncore, struct
+from Exceptions import Disconnected
+from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
+from types import StringType
+
 import socket, errno
-from logger import zLogger
 
 # Use the dictionary to make sure we get the minimum number of errno
 # entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
@@ -38,81 +41,103 @@
 expected_socket_write_errors = tuple(tmp_dict.keys())
 del tmp_dict
 
-class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
+class SizedMessageAsyncConnection(asyncore.dispatcher):
+    __super_init = asyncore.dispatcher.__init__
+    __super_close = asyncore.dispatcher.close
+
+    __closed = 1 # Marker indicating that we're closed
 
-    __append=None # Marker indicating that we're closed
+    socket = None # to outwit Sam's getattr
 
-    socket=None # to outwit Sam's getattr
+    READ_SIZE = 8096
 
     def __init__(self, sock, addr, map=None, debug=None):
-        SizedMessageAsyncConnection.inheritedAttribute(
-            '__init__')(self, sock, map)
-        self.addr=addr
-        if debug is None and __debug__:
-            self._debug = zLogger("smac")
-        else:
+        self.addr = addr
+        if debug is not None:
             self._debug = debug
-        self.__state=None
-        self.__inp=None
-        self.__inpl=0
-        self.__l=4
-        self.__output=output=[]
-        self.__append=output.append
-        self.__pop=output.pop
-
-    def handle_read(self,
-                    join=string.join, StringType=type(''), _type=type,
-                    _None=None):
-
+        elif not hasattr(self, '_debug'):
+            self._debug = __debug__ and 'smac'
+        self.__state = None
+        self.__inp = None # None, a single String, or a list
+        self.__input_len = 0
+        self.__msg_size = 4
+        self.__output = []
+        self.__closed = None
+        self.__super_init(sock, map)
+
+    # XXX avoid expensive getattr calls?  Can't remember exactly what
+    # this comment was supposed to mean, but it has something to do
+    # with the way asyncore uses getattr and uses if sock:
+    def __nonzero__(self):
+        return 1
+
+    def handle_read(self):
+        # Use a single __inp buffer and integer indexes to make this
+        # fast.
         try:
             d=self.recv(8096)
         except socket.error, err:
             if err[0] in expected_socket_read_errors:
                 return
             raise
-        if not d: return
+        if not d:
+            return
 
-        inp=self.__inp
-        if inp is _None:
-            inp=d
-        elif _type(inp) is StringType:
-            inp=[inp,d]
+        input_len = self.__input_len + len(d)
+        msg_size = self.__msg_size
+        state = self.__state
+
+        inp = self.__inp
+        if msg_size > input_len:
+            if inp is None:
+                self.__inp = d
+            elif type(self.__inp) is StringType:
+                self.__inp = [self.__inp, d]
+            else:
+                self.__inp.append(d)
+            self.__input_len = input_len
+            return # keep waiting for more input
+
+        # load all previous input and d into single string inp
+        if isinstance(inp, StringType):
+            inp = inp + d
+        elif inp is None:
+            inp = d
         else:
             inp.append(d)
+            inp = "".join(inp)
 
-        inpl=self.__inpl+len(d)
-        l=self.__l
-            
-        while 1:
-
-            if l <= inpl:
-                # Woo hoo, we have enough data
-                if _type(inp) is not StringType: inp=join(inp,'')
-                d=inp[:l]
-                inp=inp[l:]
-                inpl=inpl-l                
-                if self.__state is _None:
-                    # waiting for message
-                    l=struct.unpack(">i",d)[0]
-                    self.__state=1
-                else:
-                    l=4
-                    self.__state=_None
-                    self.message_input(d)
+        offset = 0
+        while (offset + msg_size) <= input_len:
+            msg = inp[offset:offset + msg_size]
+            offset = offset + msg_size
+            if state is None:
+                # waiting for message
+                msg_size = struct.unpack(">i", msg)[0]
+                state = 1
             else:
-                break # not enough data
-                
-        self.__l=l
-        self.__inp=inp
-        self.__inpl=inpl
-
-    def readable(self): return 1
-    def writable(self): return not not self.__output
+                msg_size = 4
+                state = None
+                self.message_input(msg)
+
+        self.__state = state
+        self.__msg_size = msg_size
+        self.__inp = inp[offset:]
+        self.__input_len = input_len - offset
+
+    def readable(self):
+        return 1
+
+    def writable(self):
+        if len(self.__output) == 0:
+            return 0
+        else:
+            return 1
 
     def handle_write(self):
-        output=self.__output
+        output = self.__output
         while output:
-            v=output[0]
+            v = output[0]
             try:
                 n=self.send(v)
             except socket.error, err:
@@ -120,37 +145,33 @@
                     break # we couldn't write anything
                 raise
             if n < len(v):
-                output[0]=v[n:]
+                output[0] = v[n:]
                 break # we can't write any more
             else:
                 del output[0]
-                #break # waaa
-
 
     def handle_close(self):
         self.close()
 
-    def message_output(self, message,
-                       pack=struct.pack, len=len):
-        if self._debug is not None:
-            if len(message) > 40:
-                m = message[:40]+' ...'
-            else:
-                m = message
-            self._debug.trace('message_output %s' % `m`)
+    def message_output(self, message):
+        if __debug__:
+            if self._debug:
+                if len(message) > 40:
+                    m = message[:40]+' ...'
+                else:
+                    m = message
+                LOG(self._debug, TRACE, 'message_output %s' % `m`)
 
-        append=self.__append
-        if append is None:
-            raise Disconnected("This action is temporarily unavailable.<p>")
-        
-        append(pack(">i",len(message))+message)
+        if self.__closed is not None:
+            raise Disconnected, (
+                "This action is temporarily unavailable."
+                "<p>"
+                )
+        # do two separate appends to avoid copying the message string
+        self.__output.append(struct.pack(">i", len(message)))
+        self.__output.append(message)
 
     def close(self):
-        if self.__append is not None:
-            self.__append=None
-            SizedMessageAsyncConnection.inheritedAttribute('close')(self)
-
-class Disconnected(Exception):
-    """The client has become disconnected from the server
-    """
-    
+        if self.__closed is None:
+            self.__closed = 1
+            self.__super_close()


=== ZEO/ZEO/start.py 1.32 => 1.33 ===
 # 
 ##############################################################################
-
 """Start the server storage.
 """
 
@@ -19,13 +18,16 @@
 
 import sys, os, getopt, string
 
+import StorageServer
+import asyncore
+
 def directory(p, n=1):
     d=p
     while n:
         d=os.path.split(d)[0]
         if not d or d=='.': d=os.getcwd()
         n=n-1
-        
+
     return d
 
 def get_storage(m, n, cache={}):
@@ -44,9 +46,11 @@
 
 def main(argv):
     me=argv[0]
-    sys.path[:]==filter(None, sys.path)
     sys.path.insert(0, directory(me, 2))
 
+    # XXX hack for profiling support
+    global unix, storages, zeo_pid, asyncore
+
     args=[]
     last=''
     for a in argv[1:]:
@@ -77,23 +81,22 @@
 
     fs = os.path.join(var, 'Data.fs')
 
-    usage = """%s [options] [filename]
+    usage="""%s [options] [filename]
 
     where options are:
 
        -D -- Run in debug mode
 
-       -d -- Generate detailed debug logging without running
-             in the foreground.
+       -d -- Set STUPD_LOG_SEVERITY to -300
 
        -U -- Unix-domain socket file to listen on
-    
+
        -u username or uid number
 
          The username to run the ZEO server as. You may want to run
          the ZEO server as 'nobody' or some other user with limited
-         resouces. The only works under Unix, and if the storage
-         server is started by root.
+         resouces. The only works under Unix, and if ZServer is
+         started by root.
 
        -p port -- port to listen on
 
@@ -116,30 +119,47 @@
             attr_name -- This is the name to which the storage object
               is assigned in the module.
 
+       -P file -- Run under profile and dump output to file.  Implies the
+          -s flag.
+
     if no file name is specified, then %s is used.
     """ % (me, fs)
 
     try:
-        opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
-    except getopt.error, err:
-        print err
+        opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:d')
+    except getopt.error, msg:
         print usage
+        print msg
         sys.exit(1)
 
-    port=None
-    debug=detailed=0
-    host=''
-    unix=None
-    Z=1
-    UID='nobody'
+    port = None
+    debug = 0
+    host = ''
+    unix =None
+    Z = 1
+    UID = 'nobody'
+    prof = None
+    detailed = 0
     for o, v in opts:
-        if o=='-p': port=string.atoi(v)
-        elif o=='-h': host=v
-        elif o=='-U': unix=v
-        elif o=='-u': UID=v
-        elif o=='-D': debug=1
-        elif o=='-d': detailed=1
-        elif o=='-s': Z=0
+        if o=='-p':
+            port = int(v)
+        elif o=='-h':
+            host = v
+        elif o=='-U':
+            unix = v
+        elif o=='-u':
+            UID = v
+        elif o=='-D':
+            debug = 1
+        elif o=='-d':
+            detailed = 1
+        elif o=='-s':
+            Z = 0
+        elif o=='-P':
+            prof = v
+
+    if prof:
+        Z = 0
 
     if port is None and unix is None:
         print usage
@@ -153,14 +173,16 @@
             sys.exit(1)
         fs=args[0]
 
-    if debug: os.environ['Z_DEBUG_MODE']='1'
-
-    if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
+    __builtins__.__debug__=debug
+    if debug:
+        os.environ['Z_DEBUG_MODE'] = '1'
+    if detailed:
+        os.environ['STUPID_LOG_SEVERITY'] = '-300'
 
     from zLOG import LOG, INFO, ERROR
 
     # Try to set uid to "-u" -provided uid.
-    # Try to set gid to  "-u" user's primary group. 
+    # Try to set gid to  "-u" user's primary group.
     # This will only work if this script is run by root.
     try:
         import pwd
@@ -175,7 +197,7 @@
                 uid = pwd.getpwuid(UID)[2]
                 gid = pwd.getpwuid(UID)[3]
             else:
-                raise KeyError 
+                raise KeyError
             try:
                 if gid is not None:
                     try:
@@ -200,7 +222,7 @@
     try:
 
         import ZEO.StorageServer, asyncore
-
+        
         storages={}
         for o, v in opts:
             if o=='-S':
@@ -243,15 +265,15 @@
 
         if not unix: unix=host, port
 
-        ZEO.StorageServer.StorageServer(unix, storages)
-
+        StorageServer.StorageServer(unix, storages)
+        
         try:
             ppid, pid = os.getppid(), os.getpid()
         except:
             pass # getpid not supported
         else:
             open(zeo_pid,'w').write("%s %s" % (ppid, pid))
-
+            
     except:
         # Log startup exception and tell zdaemon not to restart us.
         info = sys.exc_info()
@@ -269,7 +291,6 @@
 
     asyncore.loop()
 
-
 def rotate_logs():
     import zLOG
     if hasattr(zLOG.log_write, 'reinitialize'):
@@ -292,29 +313,21 @@
     # unnecessary, since we now use so_reuseaddr.
     for ignored in 1,2:
         for socket in asyncore.socket_map.values():
-            try:
-                socket.close()
-            except:
-                pass
+            try: socket.close()
+            except: pass
 
     for storage in storages.values():
-        try:
-            storage.close()
-        except:
-            pass
+        try: storage.close()
+        finally: pass
 
     try:
         from zLOG import LOG, INFO
         LOG('ZEO Server', INFO,
             "Shutting down (%s)" % (die and "shutdown" or "restart")
             )
-    except:
-        pass
-    
-    if die:
-        sys.exit(0)
-    else:
-        sys.exit(1)
+    except: pass
+
+    if die: sys.exit(0)
+    else: sys.exit(1)
 
-if __name__ == '__main__':
-    main(sys.argv)
+if __name__=='__main__': main(sys.argv)


=== ZEO/ZEO/trigger.py 1.5 => 1.6 ===
 # 
 ##############################################################################
-
-# This module is a simplified version of the select_trigger module
-# from Sam Rushing's Medusa server.
-
 import asyncore
-import errno
+
 import os
 import socket
 import string
 import thread
-    
+
 if os.name == 'posix':
 
-    class trigger(asyncore.file_dispatcher):
+    class trigger (asyncore.file_dispatcher):
 
         "Wake up a call to select() running in the main thread"
 
@@ -56,46 +52,50 @@
         # new data onto a channel's outgoing data queue at the same time that
         # the main thread is trying to remove some]
 
-        def __init__(self):
+        def __init__ (self):
             r, w = self._fds = os.pipe()
             self.trigger = w
-            asyncore.file_dispatcher.__init__(self, r)
+            asyncore.file_dispatcher.__init__ (self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
+            self._closed = None
 
-        def __del__(self):
-            os.close(self._fds[0])
-            os.close(self._fds[1])
+        # Override the asyncore close() method, because it seems that
+        # it would only close the r file descriptor and not w.  The
+        # constructor calls file_dispactcher.__init__ and passes r,
+        # which would get stored in a file_wrapper and get closed by
+        # the default close.  But that would leave w open...
+
+        def close(self):
+            if self._closed is None:
+                self._closed = 1
+                self.del_channel()
+                for fd in self._fds:
+                    os.close(fd)
 
-        def __repr__(self):
-            return '<select-trigger(pipe) at %x>' % id(self)
+        def __repr__ (self):
+            return '<select-trigger (pipe) at %x>' % id(self)
 
-        def readable(self):
+        def readable (self):
             return 1
 
-        def writable(self):
+        def writable (self):
             return 0
 
-        def handle_connect(self):
+        def handle_connect (self):
             pass
 
-        def pull_trigger(self, thunk=None):
-            # print 'PULL_TRIGGER: ', len(self.thunks)
+        def pull_trigger (self, thunk=None):
             if thunk:
                 try:
                     self.lock.acquire()
-                    self.thunks.append(thunk)
+                    self.thunks.append (thunk)
                 finally:
                     self.lock.release()
-            os.write(self.trigger, 'x')
+            os.write (self.trigger, 'x')
 
-        def handle_read(self):
-            try:
-                self.recv(8192)
-            except os.error, err:
-                if err[0] == errno.EAGAIN: # resource temporarily unavailable
-                    return
-                raise
+        def handle_read (self):
+            self.recv (8192)
             try:
                 self.lock.acquire()
                 for thunk in self.thunks:
@@ -104,7 +104,7 @@
                     except:
                         nil, t, v, tbinfo = asyncore.compact_traceback()
                         print ('exception in trigger thunk:'
-                               '(%s:%s %s)' % (t, v, tbinfo))
+                               ' (%s:%s %s)' % (t, v, tbinfo))
                 self.thunks = []
             finally:
                 self.lock.release()
@@ -116,13 +116,13 @@
 
     # win32-safe version
 
-    class trigger(asyncore.dispatcher):
+    class trigger (asyncore.dispatcher):
 
         address = ('127.9.9.9', 19999)
 
-        def __init__(self):
-            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        def __init__ (self):
+            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
 
             # set TCP_NODELAY to true to avoid buffering
             w.setsockopt(socket.IPPROTO_TCP, 1, 1)
@@ -139,51 +139,46 @@
                     if port <= 19950:
                         raise 'Bind Error', 'Cannot bind trigger!'
                     port=port - 1
-            
-            a.listen(1)
-            w.setblocking(0)
+
+            a.listen (1)
+            w.setblocking (0)
             try:
-                w.connect(self.address)
+                w.connect (self.address)
             except:
                 pass
             r, addr = a.accept()
             a.close()
-            w.setblocking(1)
+            w.setblocking (1)
             self.trigger = w
 
-            asyncore.dispatcher.__init__(self, r)
+            asyncore.dispatcher.__init__ (self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
             self._trigger_connected = 0
 
-        def __repr__(self):
+        def __repr__ (self):
             return '<select-trigger (loopback) at %x>' % id(self)
 
-        def readable(self):
+        def readable (self):
             return 1
 
-        def writable(self):
+        def writable (self):
             return 0
 
-        def handle_connect(self):
+        def handle_connect (self):
             pass
 
-        def pull_trigger(self, thunk=None):
+        def pull_trigger (self, thunk=None):
             if thunk:
                 try:
                     self.lock.acquire()
-                    self.thunks.append(thunk)
+                    self.thunks.append (thunk)
                 finally:
                     self.lock.release()
-            self.trigger.send('x')
+            self.trigger.send ('x')
 
-        def handle_read(self):
-            try:
-                self.recv(8192)
-            except os.error, err:
-                if err[0] == errno.EAGAIN: # resource temporarily unavailable
-                    return
-                raise
+        def handle_read (self):
+            self.recv (8192)
             try:
                 self.lock.acquire()
                 for thunk in self.thunks: