[Zodb-checkins] CVS: StandaloneZODB/ZEO - ClientStub.py:1.2 Exceptions.py:1.2 ServerStub.py:1.2 TransactionBuffer.py:1.2 zrpc2.py:1.2 ClientCache.py:1.19 ClientStorage.py:1.36 StorageServer.py:1.33 smac.py:1.12 start.py:1.27

Jeremy Hylton jeremy@zope.com
Fri, 11 Jan 2002 14:33:20 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv30669

Modified Files:
	ClientCache.py ClientStorage.py StorageServer.py smac.py 
	start.py 
Added Files:
	ClientStub.py Exceptions.py ServerStub.py TransactionBuffer.py 
	zrpc2.py 
Log Message:
Changes from the ZEO-ZRPC-Dev branch merge.


=== StandaloneZODB/ZEO/ClientStub.py 1.1 => 1.2 ===
+
+class ClientStorage:
+    def __init__(self, rpc):
+        self.rpc = rpc
+        
+    def beginVerify(self):
+        self.rpc.callAsync('begin')
+
+    # XXX what's the difference between these two?
+
+    def invalidate(self, args):
+        self.rpc.callAsync('invalidate', args)
+
+    def Invalidate(self, args):
+        self.rpc.callAsync('Invalidate', args)
+
+    def endVerify(self):
+        self.rpc.callAsync('end')
+
+    def serialno(self, arg):
+        self.rpc.callAsync('serialno', arg)
+
+    def info(self, arg):
+        self.rpc.callAsync('info', arg)


=== StandaloneZODB/ZEO/Exceptions.py 1.1 => 1.2 ===
+
+class Disconnected(Exception):
+    """Exception raised when a ZEO client is disconnected from the
+    ZEO server."""


=== StandaloneZODB/ZEO/ServerStub.py 1.1 => 1.2 ===
+
+class StorageServer:
+
+    def __init__(self, rpc):
+        self.rpc = rpc
+
+    def register(self, storage_name, read_only):
+        self.rpc.call('register', storage_name, read_only)
+
+    def get_info(self):
+        return self.rpc.call('get_info')
+
+    def get_size_info(self):
+        return self.rpc.call('get_size_info')
+
+    def beginZeoVerify(self):
+        self.rpc.callAsync('beginZeoVerify')
+
+    def zeoVerify(self, oid, s, sv):
+        self.rpc.callAsync('zeoVerify', oid, s, sv)
+
+    def endZeoVerify(self):
+        self.rpc.callAsync('endZeoVerify')
+
+    def new_oids(self, n=None):
+        if n is None:
+            return self.rpc.call('new_oids')
+        else:
+            return self.rpc.call('new_oids', n)
+            
+    def pack(self, t, wait=None):
+        if wait is None:
+            self.rpc.call('pack', t)
+        else:
+            self.rpc.call('pack', t, wait)
+
+    def zeoLoad(self, oid):
+        return self.rpc.call('zeoLoad', oid)
+
+    def storea(self, oid, serial, data, version, id):
+        self.rpc.callAsync('storea', oid, serial, data, version, id)
+
+    def tpc_begin(self, id, user, descr, ext):
+        return self.rpc.call('tpc_begin', id, user, descr, ext)
+
+    def vote(self, trans_id):
+        return self.rpc.call('vote', trans_id)
+
+    def tpc_finish(self, id):
+        return self.rpc.call('tpc_finish', id)
+
+    def tpc_abort(self, id):
+        self.rpc.callAsync('tpc_abort', id)
+
+    def abortVersion(self, src, id):
+        return self.rpc.call('abortVersion', src, id)
+
+    def commitVersion(self, src, dest, id):
+        return self.rpc.call('commitVersion', src, dest, id)
+        
+    def history(self, oid, version, length=None):
+        if length is not None:
+            return self.rpc.call('history', oid, version)
+        else:
+            return self.rpc.call('history', oid, version, length)
+
+    def load(self, oid, version):
+        return self.rpc.call('load', oid, version)
+
+    def loadSerial(self, oid, serial):
+        return self.rpc.call('loadSerial', oid, serial)
+
+    def modifiedInVersion(self, oid):
+        return self.rpc.call('modifiedInVersion', oid)
+
+    def new_oid(self, last=None):
+        if last is None:
+            return self.rpc.call('new_oid')
+        else:
+            return self.rpc.call('new_oid', last)
+
+    def store(self, oid, serial, data, version, trans):
+        return self.rpc.call('store', oid, serial, data, version, trans)
+
+    def transactionalUndo(self, trans_id, trans):
+        return self.rpc.call('transactionalUndo', trans_id, trans)
+
+    def undo(self, trans_id):
+        return self.rpc.call('undo', trans_id)
+
+    def undoLog(self, first, last):
+        # XXX filter not allowed across RPC
+        return self.rpc.call('undoLog', first, last)
+
+    def undoInfo(self, first, last, spec):
+        return self.rpc.call('undoInfo', first, last, spec)
+
+    def versionEmpty(self, vers):
+        return self.rpc.call('versionEmpty', vers)
+
+    def versions(self, max=None):
+        if max is None:
+            return self.rpc.call('versions')
+        else:
+            return self.rpc.call('versions', max)
+
+    


=== StandaloneZODB/ZEO/TransactionBuffer.py 1.1 => 1.2 ===
+
+A transaction may generate enough data that it is not practical to
+always hold pending updates in memory.  Instead, a TransactionBuffer
+is used to store the data until a commit or abort.
+"""
+
+# XXX Figure out what a sensible storage format is
+
+# XXX A faster implementation might store trans data in memory until
+# it reaches a certain size.
+
+import tempfile
+import cPickle
+
+class TransactionBuffer:
+    
+    def __init__(self):
+        self.file = tempfile.TemporaryFile()
+        self.count = 0
+        self.size = 0
+        # It's safe to use a fast pickler because the only objects
+        # stored are builtin types -- strings or None.
+        self.pickler = cPickle.Pickler(self.file, 1)
+        self.pickler.fast = 1
+
+    def store(self, oid, version, data):
+        """Store oid, version, data for later retrieval"""
+        self.pickler.dump((oid, version, data))
+        self.count += 1
+        # Estimate per-record cache size
+        self.size = self.size + len(data) + (27 + 12)
+        if version:
+            self.size = self.size + len(version) + 4
+
+    def invalidate(self, oid, version):
+        self.pickler.dump((oid, version, None))
+        self.count += 1
+
+    def clear(self):
+        """Mark the buffer as empty"""
+        self.file.seek(0)
+        self.count = 0
+        self.size = 0
+
+    # XXX unchecked constraints:
+    # 1. can't call store() after begin_iterate()
+    # 2. must call clear() after iteration finishes
+
+    def begin_iterate(self):
+        """Move the file pointer in advance of iteration"""
+        self.file.flush()
+        self.file.seek(0)
+        self.unpickler = cPickle.Unpickler(self.file)
+
+    def next(self):
+        """Return next tuple of data or None if EOF"""
+        if self.count == 0:
+            del self.unpickler
+            return None
+        oid_ver_data = self.unpickler.load()
+        self.count -= 1
+        return oid_ver_data
+            
+    def get_size(self):
+        """Return size of data stored in buffer (just a hint)."""
+    
+        return self.size


=== StandaloneZODB/ZEO/zrpc2.py 1.1 => 1.2 === (615/715 lines abridged)
+
+The basic protocol is as:
+a pickled tuple containing: msgid, flags, method, args
+
+msgid is an integer.
+flags is an integer.
+    The only currently defined flag is ASYNC (0x1), which means
+    the client does not expect a reply.
+method is a string specifying the method to invoke.
+    For a reply, the method is ".reply".
+args is a tuple of the argument to pass to method.
+
+XXX need to specify a version number that describes the protocol.
+allow for future revision.
+
+XXX support multiple outstanding calls
+
+XXX factor out common pattern of deciding what protocol to use based
+on whether address is tuple or string
+"""
+
+import asyncore
+import errno
+import cPickle
+import os
+import select
+import socket
+import sys
+import threading
+import thread
+import time
+import traceback
+import types
+
+from cStringIO import StringIO
+
+from ZODB import POSException
+from ZEO import smac, trigger
+from Exceptions import Disconnected
+import zLOG
+import ThreadedAsync
+from Exceptions import Disconnected
+
+REPLY = ".reply" # message name used for replies
+ASYNC = 1
+
+_label = "zrpc:%s" % os.getpid()
+
+def new_label():
+    global _label

[-=- -=- -=- 615 lines omitted -=- -=- -=-]

+
+    def readable(self):
+        return 1
+
+    def handle_accept(self):
+        try:
+            sock, addr = self.accept()
+        except socket.error, msg:
+            log("accepted failed: %s" % msg)
+            return
+        c = self.factory(sock, addr, self.obj)
+        log("connect from %s: %s" % (repr(addr), c))
+        self.clients.append(c)
+
+class Handler:
+    """Base class used to handle RPC caller discovery"""
+
+    def set_caller(self, addr):
+        self.__caller = addr
+
+    def get_caller(self):
+        return self.__caller
+
+    def clear_caller(self):
+        self.__caller = None
+
+_globals = globals()
+_silly = ('__doc__',)
+
+def find_global(module, name):
+    """Helper for message unpickler"""
+    try:
+        m = __import__(module, _globals, _globals, _silly)
+    except ImportError, msg:
+        raise ZRPCError("import error %s: %s" % (module, msg))
+
+    try:
+        r = getattr(m, name)
+    except AttributeError:
+        raise ZRPCError("module %s has no global %s" % (module, name))
+        
+    safe = getattr(r, '__no_side_effects__', 0)
+    if safe:
+        return r
+
+    if type(r) == types.ClassType and issubclass(r, Exception):
+        return r
+
+    raise ZRPCError("Unsafe global: %s.%s" % (module, name))
+


=== StandaloneZODB/ZEO/ClientCache.py 1.18 => 1.19 ===
 from struct import pack, unpack
 from thread import allocate_lock
-import zLOG
 
-magic='ZEC0'
+import sys
+import zLOG
 
-def LOG(msg, level=zLOG.BLATHER):
+def log(msg, level=zLOG.INFO):
     zLOG.LOG("ZEC", level, msg)
 
+magic='ZEC0'
+
 class ClientCache:
 
     def __init__(self, storage='', size=20000000, client=None, var=None):
@@ -211,16 +213,14 @@
             f[0].write(magic)
             current=0
 
+        log("cache opened.  current = %s" % current)
+
         self._limit=size/2
         self._current=current
 
-    def close(self):
-        try:
-            self._f[self._current].close()
-        except (os.error, ValueError):
-            pass
-
     def open(self):
+         # XXX open is overloaded to perform two tasks for
+         # optimization reasons  
         self._acquire()
         try:
             self._index=index={}
@@ -235,6 +235,19 @@
             return serial.items()
         finally: self._release()
 
+    def close(self):
+        for f in self._f:
+            if f is not None:
+                f.close()
+
+    def verify(self, verifyFunc):
+        """Call the verifyFunc on every object in the cache.
+
+        verifyFunc(oid, serialno, version)
+        """
+        for oid, (s, vs) in self.open():
+            verifyFunc(oid, s, vs)
+
     def invalidate(self, oid, version):
         self._acquire()
         try:
@@ -373,8 +386,6 @@
                     self._f[current]=open(self._p[current],'w+b')
                 else:
                     # Temporary cache file:
-                    if self._f[current] is not None:
-                        self._f[current].close()
                     self._f[current] = tempfile.TemporaryFile(suffix='.zec')
                 self._f[current].write(magic)
                 self._pos=pos=4
@@ -383,55 +394,57 @@
 
     def store(self, oid, p, s, version, pv, sv):
         self._acquire()
-        try: self._store(oid, p, s, version, pv, sv)
-        finally: self._release()
+        try:
+            self._store(oid, p, s, version, pv, sv)
+        finally:
+            self._release()
 
     def _store(self, oid, p, s, version, pv, sv):
         if not s:
-            p=''
-            s='\0\0\0\0\0\0\0\0'
-        tlen=31+len(p)
+            p = ''
+            s = '\0\0\0\0\0\0\0\0'
+        tlen = 31 + len(p)
         if version:
-            tlen=tlen+len(version)+12+len(pv)
-            vlen=len(version)
+            tlen = tlen + len(version) + 12 + len(pv)
+            vlen = len(version)
         else:
-            vlen=0
+            vlen = 0
 
-        pos=self._pos
-        current=self._current
-        f=self._f[current]
-        f.seek(pos)
-        stlen=pack(">I",tlen)
-        write=f.write
-        write(oid+'v'+stlen+pack(">HI", vlen, len(p))+s)
-        if p: write(p)
+        stlen = pack(">I", tlen)
+        # accumulate various data to write into a list
+        l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
+        if p:
+            l.append(p)
         if version:
-            write(version)
-            write(pack(">I", len(pv)))
-            write(pv)
-            write(sv)
-
-        write(stlen)
+            l.extend([version, 
+                      pack(">I", len(pv)),
+                      pv, sv])
+        l.append(stlen)
+        f = self._f[self._current]
+        f.seek(self._pos)
+        f.write("".join(l))
 
-        if current: self._index[oid]=-pos
-        else: self._index[oid]=pos
+        if self._current:
+            self._index[oid] = - self._pos
+        else:
+            self._index[oid] = self._pos
 
-        self._pos=pos+tlen
+        self._pos += tlen
 
 def read_index(index, serial, f, current):
-    LOG("read_index(%s)" % f.name)
     seek=f.seek
     read=f.read
     pos=4
+    seek(0,2)
+    size=f.tell()
 
     while 1:
-        seek(pos)
+        f.seek(pos)
         h=read(27)
-
+        
         if len(h)==27 and h[8] in 'vni':
             tlen, vlen, dlen = unpack(">iHi", h[9:19])
-        else:
-            break
+        else: tlen=-1
         if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
             break
 
@@ -466,15 +479,3 @@
     except: pass
     
     return pos
-
-def main(files):
-    for file in files:
-        print file
-        index = {}
-        serial = {}
-        read_index(index, serial, open(file), 0)
-        print index.keys()
-
-if __name__ == "__main__":
-    import sys
-    main(sys.argv[1:])


=== StandaloneZODB/ZEO/ClientStorage.py 1.35 => 1.36 === (868/968 lines abridged)
 ##############################################################################
 """Network ZODB storage client
-"""
 
+XXX support multiple outstanding requests up until the vote?
+XXX is_connected() vis ClientDisconnected error
+"""
 __version__='$Revision$'[11:-2]
 
-import struct, time, os, socket, string, Sync, zrpc, ClientCache
-import tempfile, Invalidator, ExtensionClass, thread
-import ThreadedAsync
-
-now=time.time
+import cPickle
+import os
+import socket
+import string
+import struct
+import sys
+import tempfile
+import thread
+import threading
+import time
+from types import TupleType, StringType
 from struct import pack, unpack
-from ZODB import POSException, BaseStorage
+
+import ExtensionClass, Sync, ThreadLock
+import ClientCache
+import zrpc2
+import ServerStub
+from TransactionBuffer import TransactionBuffer
+
+from ZODB import POSException
 from ZODB.TimeStamp import TimeStamp
-from zLOG import LOG, PROBLEM, INFO
+from zLOG import LOG, PROBLEM, INFO, BLATHER
+from Exceptions import Disconnected
 
-try: from ZODB.ConflictResolution import ResolvedSerial
-except: ResolvedSerial='rs'
+def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
+    LOG(subsys, type, msg)
 
-TupleType=type(())
+try:
+    from ZODB.ConflictResolution import ResolvedSerial
+except ImportError:
+    ResolvedSerial = 'rs'

[-=- -=- -=- 868 lines omitted -=- -=- -=-]

-    _w.append(t)
-    return t
+        return self._server.versions(max)
+
+    # below are methods invoked by the StorageServer
+
+    def serialno(self, arg):
+        self._serials.append(arg)
+
+    def info(self, dict):
+        self._info.update(dict)
+
+    def begin(self):
+        self._tfile = tempfile.TemporaryFile()
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
+    def invalidate(self, args):
+        if self._pickler is None:
+            return
+        self._pickler.dump(args)
+
+    def end(self):
+        if self._pickler is None:
+            return
+        self._pickler.dump((0,0))
+##        self._pickler.dump = None
+        self._tfile.seek(0)
+        unpick = cPickle.Unpickler(self._tfile)
+        self._tfile = None
+
+        while 1:
+            oid, version = unpick.load()
+            if not oid:
+                break
+            self._cache.invalidate(oid, version=version)
+            self._db.invalidate(oid, version=version)
+
+    def Invalidate(self, args):
+        # XXX _db could be None
+        for oid, version in args:
+            self._cache.invalidate(oid, version=version)
+            try:
+                self._db.invalidate(oid, version=version)
+            except AttributeError, msg:
+                log2(PROBLEM,
+                    "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
+                                                               repr(version),
+                                                               msg))
+                    


=== StandaloneZODB/ZEO/StorageServer.py 1.32 => 1.33 === (750/850 lines abridged)
+##############################################################################
 # 
 # Zope Public License (ZPL) Version 1.0
 # -------------------------------------
@@ -59,7 +59,7 @@
 #    labeled as unofficial distributions.  Modifications which do not
 #    carry the name Zope may be packaged in any form, as long as they
 #    conform to all of the clauses above.
-# 
+#
 # 
 # Disclaimer
 # 
@@ -82,527 +82,394 @@
 # attributions are listed in the accompanying credits file.
 # 
 ##############################################################################
+"""Network ZODB storage server
 
-__version__ = "$Revision$"[11:-2]
+This server acts as a front-end for one or more real storages, like
+file storage or Berkeley storage.
 
-import asyncore, socket, string, sys, os
-from smac import SizedMessageAsyncConnection
-from ZODB import POSException
+XXX Need some basic access control-- a declaration of the methods
+exported for invocation by the server.
+"""
+
+import asyncore
 import cPickle
-from cPickle import Unpickler
-from ZODB.POSException import TransactionError, UndoError, VersionCommitError
-from ZODB.Transaction import Transaction
-import traceback
-from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
+import os
+import sys
+import threading
+import types
+
+import ClientStub
+import zrpc2
+import zLOG
+
+from zrpc2 import Dispatcher, Handler, ManagedServerConnection, Delay
+from ZODB.POSException import StorageError, StorageTransactionError, \
+     TransactionError, ReadOnlyError
 from ZODB.referencesf import referencesf

[-=- -=- -=- 750 lines omitted -=- -=- -=-]

-    try:
-        port='', int(port)
-    except:
-        pass
-
-    d = {'1': ZODB.FileStorage.FileStorage(name)}
-    StorageServer(port, d)
-    asyncwrap.loop()
+            self.server.invalidate(self, self.__storage_id,
+                                   self.__invalidated,
+                                   self.get_size_info())
+
+        if not self._handle_waiting():
+            self._transaction = None
+            self.__invalidated = []
+
+    def tpc_abort(self, id):
+        if not self._check_tid(id):
+            return
+        r = self.__storage.tpc_abort(self._transaction)
+        assert self.__storage._transaction is None
+
+        if not self._handle_waiting():
+            self._transaction = None
+            self.__invalidated = []
+
+    def _restart_delayed_transaction(self, delay, trans):
+        self._transaction = trans
+        self.__storage.tpc_begin(trans)
+        self.__invalidated = []
+        assert self._transaction.id == self.__storage._transaction.id
+        delay.reply(None)
+
+    def _handle_waiting(self):
+        if self.__storage.__waiting:
+            delay, proxy, trans = self.__storage.__waiting.pop(0)
+            proxy._restart_delayed_transaction(delay, trans)
+            if self is proxy:
+                return 1
+        
+    def new_oids(self, n=100):
+        """Return a sequence of n new oids, where n defaults to 100"""
+        if n < 0:
+            n = 1
+        return [self.__storage.new_oid() for i in range(n)]
+
+def fixup_storage(storage):
+    # backwards compatibility hack
+    if not hasattr(storage,'tpc_vote'):
+        storage.tpc_vote = lambda *args: None


=== StandaloneZODB/ZEO/smac.py 1.11 => 1.12 ===
 __version__ = "$Revision$"[11:-2]
 
-import asyncore, string, struct, zLOG, sys, Acquisition
+import asyncore, struct
+from Exceptions import Disconnected
+from zLOG import LOG, TRACE, ERROR, INFO, BLATHER
+from types import StringType
+
 import socket, errno
-from zLOG import LOG, TRACE, ERROR, INFO
 
 # Use the dictionary to make sure we get the minimum number of errno
 # entries.   We expect that EWOULDBLOCK == EAGAIN on most systems --
@@ -109,81 +112,101 @@
 expected_socket_write_errors = tuple(tmp_dict.keys())
 del tmp_dict
 
-class SizedMessageAsyncConnection(Acquisition.Explicit, asyncore.dispatcher):
+class SizedMessageAsyncConnection(asyncore.dispatcher):
+    __super_init = asyncore.dispatcher.__init__
+    __super_close = asyncore.dispatcher.close
+
+    __closed = 1 # Marker indicating that we're closed
 
-    __append=None # Marker indicating that we're closed
+    socket = None # to outwit Sam's getattr
 
-    socket=None # to outwit Sam's getattr
+    READ_SIZE = 8096
 
     def __init__(self, sock, addr, map=None, debug=None):
-        SizedMessageAsyncConnection.inheritedAttribute(
-            '__init__')(self, sock, map)
-        self.addr=addr
+        self.__super_init(sock, map)
+        self.addr = addr
         if debug is not None:
-            self._debug=debug
+            self._debug = debug
         elif not hasattr(self, '_debug'):
-            self._debug=__debug__ and 'smac'
-        self.__state=None
-        self.__inp=None
-        self.__inpl=0
-        self.__l=4
-        self.__output=output=[]
-        self.__append=output.append
-        self.__pop=output.pop
-
-    def handle_read(self,
-                    join=string.join, StringType=type(''), _type=type,
-                    _None=None):
-
+            self._debug = __debug__ and 'smac'
+        self.__state = None
+        self.__inp = None # None, a single String, or a list
+        self.__input_len = 0
+        self.__msg_size = 4
+        self.__output = []
+        self.__closed = None
+
+    # XXX avoid expensive getattr calls?
+    def __nonzero__(self):
+        return 1
+
+    def handle_read(self):
+        # Use a single __inp buffer and integer indexes to make this
+        # fast.
         try:
             d=self.recv(8096)
         except socket.error, err:
             if err[0] in expected_socket_read_errors:
                 return
             raise
-        if not d: return
+        if not d:
+            return
 
-        inp=self.__inp
-        if inp is _None:
-            inp=d
-        elif _type(inp) is StringType:
-            inp=[inp,d]
+        input_len = self.__input_len + len(d)
+        msg_size = self.__msg_size
+        state = self.__state
+
+        inp = self.__inp
+        if msg_size > input_len:
+            if inp is None:
+                self.__inp = d
+            elif type(self.__inp) is StringType:
+                self.__inp = [self.__inp, d]
+            else:
+                self.__inp.append(d)
+            self.__input_len = input_len
+            return # keep waiting for more input
+
+        # load all previous input and d into single string inp
+        if isinstance(inp, StringType):
+            inp = inp + d
+        elif inp is None:
+            inp = d
         else:
             inp.append(d)
+            inp = "".join(inp)
 
-        inpl=self.__inpl+len(d)
-        l=self.__l
-            
-        while 1:
-
-            if l <= inpl:
-                # Woo hoo, we have enough data
-                if _type(inp) is not StringType: inp=join(inp,'')
-                d=inp[:l]
-                inp=inp[l:]
-                inpl=inpl-l                
-                if self.__state is _None:
-                    # waiting for message
-                    l=struct.unpack(">i",d)[0]
-                    self.__state=1
-                else:
-                    l=4
-                    self.__state=_None
-                    self.message_input(d)
+        offset = 0
+        while (offset + msg_size) <= input_len:
+            msg = inp[offset:offset + msg_size]
+            offset = offset + msg_size
+            if state is None:
+                # waiting for message
+                msg_size = struct.unpack(">i", msg)[0]
+                state = 1
             else:
-                break # not enough data
-                
-        self.__l=l
-        self.__inp=inp
-        self.__inpl=inpl
+                msg_size = 4
+                state = None
+                self.message_input(msg)
+
+        self.__state = state
+        self.__msg_size = msg_size
+        self.__inp = inp[offset:]
+        self.__input_len = input_len - offset
 
-    def readable(self): return 1
-    def writable(self): return not not self.__output
+    def readable(self):
+        return 1
+    
+    def writable(self):
+        if len(self.__output) == 0:
+            return 0
+        else:
+            return 1
 
     def handle_write(self):
-        output=self.__output
+        output = self.__output
         while output:
-            v=output[0]
+            v = output[0]
             try:
                 n=self.send(v)
             except socket.error, err:
@@ -191,42 +214,33 @@
                     break # we couldn't write anything
                 raise
             if n < len(v):
-                output[0]=v[n:]
+                output[0] = v[n:]
                 break # we can't write any more
             else:
                 del output[0]
-                #break # waaa
-
 
     def handle_close(self):
         self.close()
 
-    def message_output(self, message,
-                       pack=struct.pack, len=len):
-        if self._debug:
-            if len(message) > 40: m=message[:40]+' ...'
-            else: m=message
-            LOG(self._debug, TRACE, 'message_output %s' % `m`)
-
-        append=self.__append
-        if append is None:
-            raise Disconnected("This action is temporarily unavailable.<p>")
-        
-        append(pack(">i",len(message))+message)
-
-    def log_info(self, message, type='info'):
-        if type=='error': type=ERROR
-        else: type=INFO
-        LOG('ZEO', type, message)
+    def message_output(self, message):
+        if __debug__:
+            if self._debug:
+                if len(message) > 40:
+                    m = message[:40]+' ...'
+                else:
+                    m = message
+                LOG(self._debug, TRACE, 'message_output %s' % `m`)
 
-    log=log_info
+        if self.__closed is not None:
+            raise Disconnected, (
+                "This action is temporarily unavailable."
+                "<p>"
+                )
+        # do two separate appends to avoid copying the message string
+        self.__output.append(struct.pack(">i", len(message)))
+        self.__output.append(message)
 
     def close(self):
-        if self.__append is not None:
-            self.__append=None
-            SizedMessageAsyncConnection.inheritedAttribute('close')(self)
-
-class Disconnected(Exception):
-    """The client has become disconnected from the server
-    """
-    
+        if self.__closed is None:
+            self.__closed = 1
+            self.__super_close()


=== StandaloneZODB/ZEO/start.py 1.26 => 1.27 ===
 import sys, os, getopt, string
 
+import StorageServer
+import asyncore
+
 def directory(p, n=1):
     d=p
     while n:
@@ -115,9 +118,11 @@
 
 def main(argv):
     me=argv[0]
-    sys.path[:]==filter(None, sys.path)
     sys.path.insert(0, directory(me, 2))
 
+    # XXX hack for profiling support
+    global unix, storages, zeo_pid, asyncore
+
     args=[]
     last=''
     for a in argv[1:]:
@@ -130,25 +135,13 @@
         args.append(a)
         last=a
 
-    if os.environ.has_key('INSTANCE_HOME'):
-        INSTANCE_HOME=os.environ['INSTANCE_HOME']
-    elif os.path.isdir(os.path.join(directory(me, 4),'var')):
-        INSTANCE_HOME=directory(me, 4)
-    else:
-        INSTANCE_HOME=os.getcwd()
-
-    if os.path.isdir(os.path.join(INSTANCE_HOME, 'var')):
-        var=os.path.join(INSTANCE_HOME, 'var')
-    else:
-        var=INSTANCE_HOME
+    INSTANCE_HOME=os.environ.get('INSTANCE_HOME', directory(me, 4))
 
     zeo_pid=os.environ.get('ZEO_SERVER_PID',
-                           os.path.join(var, 'ZEO_SERVER.pid')
+                           os.path.join(INSTANCE_HOME, 'var', 'ZEO_SERVER.pid')
                            )
 
-    opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
-
-    fs=os.path.join(var, 'Data.fs')
+    fs=os.path.join(INSTANCE_HOME, 'var', 'Data.fs')
 
     usage="""%s [options] [filename]
 
@@ -156,17 +149,14 @@
 
        -D -- Run in debug mode
 
-       -d -- Generate detailed debug logging without running
-             in the foreground.
-
        -U -- Unix-domain socket file to listen on
     
        -u username or uid number
 
          The username to run the ZEO server as. You may want to run
          the ZEO server as 'nobody' or some other user with limited
-         resouces. The only works under Unix, and if the storage
-         server is started by root.
+         resouces. The only works under Unix, and if ZServer is
+         started by root.
 
        -p port -- port to listen on
 
@@ -189,23 +179,42 @@
             attr_name -- This is the name to which the storage object
               is assigned in the module.
 
+       -P file -- Run under profile and dump output to file.  Implies the
+          -s flag.
+
     if no file name is specified, then %s is used.
     """ % (me, fs)
 
+    try:
+        opts, args = getopt.getopt(args, 'p:Dh:U:sS:u:P:')
+    except getopt.error, msg:
+        print usage
+        print msg
+        sys.exit(1)
+    
     port=None
-    debug=detailed=0
+    debug=0
     host=''
     unix=None
     Z=1
     UID='nobody'
+    prof = None
     for o, v in opts:
         if o=='-p': port=string.atoi(v)
         elif o=='-h': host=v
         elif o=='-U': unix=v
         elif o=='-u': UID=v
         elif o=='-D': debug=1
-        elif o=='-d': detailed=1
         elif o=='-s': Z=0
+        elif o=='-P': prof = v
+
+    if prof:
+        Z = 0
+
+    try:
+        from ZServer.medusa import asyncore
+        sys.modules['asyncore']=asyncore
+    except: pass
 
     if port is None and unix is None:
         print usage
@@ -219,10 +228,9 @@
             sys.exit(1)
         fs=args[0]
 
+    __builtins__.__debug__=debug
     if debug: os.environ['Z_DEBUG_MODE']='1'
 
-    if detailed: os.environ['STUPID_LOG_SEVERITY']='-99999'
-
     from zLOG import LOG, INFO, ERROR
 
     # Try to set uid to "-u" -provided uid.
@@ -263,71 +271,54 @@
             import zdaemon
             zdaemon.run(sys.argv, '')
 
-    try:
-
-        import ZEO.StorageServer, asyncore
-
-        storages={}
-        for o, v in opts:
-            if o=='-S':
-                n, m = string.split(v,'=')
-                if string.find(m,':'):
-                    # we got an attribute name
-                    m, a = string.split(m,':')
-                else:
-                    # attribute name must be same as storage name
-                    a=n
-                storages[n]=get_storage(m,a)
-
-        if not storages:
-            import ZODB.FileStorage
-            storages['1']=ZODB.FileStorage.FileStorage(fs)
-
-        # Try to set up a signal handler
-        try:
-            import signal
-
-            signal.signal(signal.SIGTERM,
-                          lambda sig, frame, s=storages: shutdown(s)
-                          )
-            signal.signal(signal.SIGINT,
-                          lambda sig, frame, s=storages: shutdown(s, 0)
-                          )
-            try: signal.signal(signal.SIGHUP, rotate_logs_handler)
-            except: pass
-
-        except: pass
-
-        items=storages.items()
-        items.sort()
-        for kv in items:
-            LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
-
-        if not unix: unix=host, port
-
-        ZEO.StorageServer.StorageServer(unix, storages)
-
-        try: ppid, pid = os.getppid(), os.getpid()
-        except: pass # getpid not supported
-        else: open(zeo_pid,'w').write("%s %s" % (ppid, pid))
-
-    except:
-        # Log startup exception and tell zdaemon not to restart us.
-        info=sys.exc_info()
-        try:
-            import zLOG
-            zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
-                     error=info)
-        except:
-            pass
-
-        import traceback
-        apply(traceback.print_exception, info)
-            
-        sys.exit(0)
+    storages={}
+    for o, v in opts:
+        if o=='-S':
+            n, m = string.split(v,'=')
+            if string.find(m,':'):
+                # we got an attribute name
+                m, a = string.split(m,':')
+            else:
+                # attribute name must be same as storage name
+                a=n
+            storages[n]=get_storage(m,a)
+
+    if not storages:
+        import ZODB.FileStorage
+        storages['1']=ZODB.FileStorage.FileStorage(fs)
 
-    asyncore.loop()
+    # Try to set up a signal handler
+    try:
+        import signal
 
+        signal.signal(signal.SIGTERM,
+                      lambda sig, frame, s=storages: shutdown(s)
+                      )
+        signal.signal(signal.SIGINT,
+                      lambda sig, frame, s=storages: shutdown(s, 0)
+                      )
+        signal.signal(signal.SIGHUP, rotate_logs_handler)
+
+    finally: pass
+
+    items=storages.items()
+    items.sort()
+    for kv in items:
+        LOG('ZEO Server', INFO, 'Serving %s:\t%s' % kv)
+
+    if not unix: unix=host, port
+
+    if prof:
+        cmds = \
+        "StorageServer.StorageServer(unix, storages);" \
+        'open(zeo_pid,"w").write("%s %s" % (os.getppid(), os.getpid()));' \
+        "asyncore.loop()"
+        import profile
+        profile.run(cmds, prof)
+    else:
+        StorageServer.StorageServer(unix, storages)
+        open(zeo_pid,'w').write("%s %s" % (os.getppid(), os.getpid()))
+        asyncore.loop()
 
 def rotate_logs():
     import zLOG
@@ -335,10 +326,7 @@
         zLOG.log_write.reinitialize()
     else:
         # Hm, lets at least try to take care of the stupid logger:
-        if hasattr(zLOG, '_set_stupid_dest'):
-            zLOG._set_stupid_dest(None)
-        else:
-            zLOG._stupid_dest = None
+        zLOG._stupid_dest=None
 
 def rotate_logs_handler(signum, frame):
     rotate_logs()
@@ -359,7 +347,7 @@
 
     for storage in storages.values():
         try: storage.close()
-        except: pass
+        finally: pass
 
     try:
         from zLOG import LOG, INFO