[Zope-Checkins] CVS: ZODB3/ZEO - monitor.py:1.1 StorageServer.py:1.89 runsvr.py:1.27 schema.xml:1.5

Jeremy Hylton jeremy@zope.com
Thu, 9 Jan 2003 16:50:51 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv26741

Modified Files:
	StorageServer.py runsvr.py schema.xml 
Added Files:
	monitor.py 
Log Message:
Add provisional monitor server that reports server statistics

Also, remove unused reuse_addr arg to ZEO.zrpc.server.  The server was
always calling set_reuse_addr().

No tests yet, that's the next step.  Simple functional tests work.



=== Added File ZODB3/ZEO/monitor.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Monitor behavior of ZEO server and record statistics.

$id:$
"""

import asyncore
import socket
import time
import types

import ZEO

class StorageStats:
    """Per-storage usage statistics."""

    def __init__(self):
        self.loads = 0
        self.stores = 0
        self.commits = 0
        self.aborts = 0
        self.active_txns = 0
        self.clients = 0
        self.verifying_clients = 0
        self.lock_time = None
        self.conflicts = 0
        self.conflicts_resolved = 0

    def dump(self, f):
        print >> f, "Clients:", self.clients
        print >> f, "Clients verifying:", self.verifying_clients
        print >> f, "Active transactions:", self.active_txns
        if self.lock_time:
            howlong = time.time() - self.lock_time
            print >> f, "Commit lock held for:", int(howlong)
        print >> f, "Commits:", self.commits
        print >> f, "Aborts:", self.aborts
        print >> f, "Loads:", self.loads
        print >> f, "Stores:", self.stores
        print >> f, "Conflicts:", self.conflicts
        print >> f, "Conflicts resolved:", self.conflicts_resolved

class StatsClient(asyncore.dispatcher):

    def __init__(self, sock, addr):
        asyncore.dispatcher.__init__(self, sock)
        self.buf = []
        self.closed = 0

    def close(self):
        self.closed = 1
        # The socket is closed after all the data is written.
        # See handle_write().

    def write(self, s):
        self.buf.append(s)

    def writable(self):
        return len(self.buf)

    def readable(self):
        # XXX what goes here?
        return 0

    def handle_write(self):
        s = "".join(self.buf)
        self.buf = []
        n = self.socket.send(s)
        if n < len(s):
            self.buf.append(s[:n])
            
        if self.closed and not self.buf:
            asyncore.dispatcher.close(self)

class StatsServer(asyncore.dispatcher):

    StatsConnectionClass = StatsClient

    def __init__(self, addr, stats):
        asyncore.dispatcher.__init__(self)
        self.addr = addr
        self.stats = stats
        if type(self.addr) == types.TupleType:
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
        self.set_reuse_addr()
        self.bind(self.addr)
        self.listen(5)

    def writable(self):
        return 0

    def readable(self):
        return 1

    def handle_accept(self):
        try:
            sock, addr = self.accept()
        except socket.error:
            return
        f = self.StatsConnectionClass(sock, addr)
        self.dump(f)
        f.close()

    def dump(self, f):
        print >> f, "ZEO monitor server version %s" % ZEO.version
        print >> f, time.ctime()
        print >> f

        L = self.stats.keys()
        L.sort()
        for k in L:
            stats = self.stats[k]
            print >> f, "Storage:", k
            stats.dump(f)
            print >> f


=== ZODB3/ZEO/StorageServer.py 1.88 => 1.89 ===
--- ZODB3/ZEO/StorageServer.py:1.88	Thu Jan  9 13:45:08 2003
+++ ZODB3/ZEO/StorageServer.py	Thu Jan  9 16:50:18 2003
@@ -31,13 +31,15 @@
 
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
+from ZEO.monitor import StorageStats, StatsServer
 from ZEO.zrpc.server import Dispatcher
 from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
 from ZEO.zrpc.trigger import trigger
 
 import zLOG
+from ZODB.ConflictResolution import ResolvedSerial
 from ZODB.POSException import StorageError, StorageTransactionError
-from ZODB.POSException import TransactionError, ReadOnlyError
+from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
 from ZODB.referencesf import referencesf
 from ZODB.Transaction import Transaction
 from ZODB.utils import u64
@@ -65,7 +67,9 @@
 
     def __init__(self, server, read_only=0):
         self.server = server
+        # timeout and stats will be initialized in register()
         self.timeout = None
+        self.stats = None
         self.connection = None
         self.client = None
         self.storage = None
@@ -73,6 +77,7 @@
         self.transaction = None
         self.read_only = read_only
         self.locked = 0
+        self.verifying = 0
         self.log_label = _label
 
     def notifyConnected(self, conn):
@@ -94,6 +99,8 @@
             self._abort()
         else:
             self.log("disconnected")
+        if self.stats is not None:
+            self.stats.clients -= 1
 
     def __repr__(self):
         tid = self.transaction and repr(self.transaction.id)
@@ -130,7 +137,7 @@
                     setattr(self, name, getattr(self.storage, name))
         self.lastTransaction = self.storage.lastTransaction
 
-    def check_tid(self, tid, exc=None):
+    def _check_tid(self, tid, exc=None):
         if self.read_only:
             raise ReadOnlyError()
         caller = sys._getframe().f_back.f_code.co_name
@@ -150,6 +157,18 @@
                 return 0
         return 1
 
+    # _lock() and _unlock() control the locked flag
+
+    def _lock(self):
+        self.locked = 1
+        self.timeout.begin(self)
+        self.stats.lock_time = time.time()
+
+    def _unlock(self):
+        self.locked = 0
+        self.timeout.end(self)
+        self.stats.lock_time = None
+
     def register(self, storage_id, read_only):
         """Select the storage that this client will use
 
@@ -170,7 +189,8 @@
         self.storage_id = storage_id
         self.storage = storage
         self.setup_delegation()
-        self.timeout = self.server.register_connection(storage_id, self)
+        self.timeout, self.stats = self.server.register_connection(storage_id,
+                                                                   self)
 
     def get_info(self):
         return {'length': len(self.storage),
@@ -197,6 +217,7 @@
             return e()
 
     def zeoLoad(self, oid):
+        self.stats.loads += 1
         v = self.storage.modifiedInVersion(oid)
         if v:
             pv, sv = self.storage.load(oid, v)
@@ -221,6 +242,9 @@
         return invtid, invlist
 
     def zeoVerify(self, oid, s, sv):
+        if not self.verifying:
+            self.verifying = 1
+            self.stats.verifying_clients += 1
         try:
             os = self.storage.getSerial(oid)
         except KeyError:
@@ -251,6 +275,9 @@
                     self.client.invalidateVerify((oid, ''))
 
     def endZeoVerify(self):
+        if self.verifying:
+            self.stats.verifying_clients -= 1
+        self.verifying = 0
         self.client.endVerify()
 
     def pack(self, time, wait=1):
@@ -320,31 +347,34 @@
         self.txnlog = CommitLog()
         self.tid = tid
         self.status = status
+        self.stats.active_txns += 1
 
     def tpc_finish(self, id):
-        if not self.check_tid(id):
+        if not self._check_tid(id):
             return
         assert self.locked
+        self.stats.active_txns -= 1
+        self.stats.commits += 1
         self.storage.tpc_finish(self.transaction)
         tid = self.storage.lastTransaction()
         if self.invalidated:
             self.server.invalidate(self, self.storage_id, tid,
                                    self.invalidated, self.get_size_info())
         self.transaction = None
-        self.locked = 0
-        self.timeout.end(self)
+        self._unlock()
         # Return the tid, for cache invalidation optimization
         self._handle_waiting()
         return tid
 
     def tpc_abort(self, id):
-        if not self.check_tid(id):
+        if not self._check_tid(id):
             return
+        self.stats.active_txns -= 1
+        self.stats.aborts += 1
         if self.locked:
             self.storage.tpc_abort(self.transaction)
         self.transaction = None
-        self.locked = 0
-        self.timeout.end(self)
+        self._unlock()
         self._handle_waiting()
 
     def _abort(self):
@@ -361,6 +391,8 @@
                     break
 
         if self.transaction:
+            self.stats.active_txns -= 1
+            self.stats.aborts += 1
             self.tpc_abort(self.transaction.id)
 
     # The public methods of the ZEO client API do not do the real work.
@@ -369,44 +401,44 @@
     # an _.
 
     def storea(self, oid, serial, data, version, id):
-        self.check_tid(id, exc=StorageTransactionError)
+        self._check_tid(id, exc=StorageTransactionError)
+        self.stats.stores += 1
         self.txnlog.store(oid, serial, data, version)
 
     # The following four methods return values, so they must acquire
     # the storage lock and begin the transaction before returning.
 
     def vote(self, id):
-        self.check_tid(id, exc=StorageTransactionError)
+        self._check_tid(id, exc=StorageTransactionError)
         if self.locked:
             return self._vote()
         else:
             return self._wait(lambda: self._vote())
 
     def abortVersion(self, src, id):
-        self.check_tid(id, exc=StorageTransactionError)
+        self._check_tid(id, exc=StorageTransactionError)
         if self.locked:
             return self._abortVersion(src)
         else:
             return self._wait(lambda: self._abortVersion(src))
 
     def commitVersion(self, src, dest, id):
-        self.check_tid(id, exc=StorageTransactionError)
+        self._check_tid(id, exc=StorageTransactionError)
         if self.locked:
             return self._commitVersion(src, dest)
         else:
             return self._wait(lambda: self._commitVersion(src, dest))
 
     def transactionalUndo(self, trans_id, id):
-        self.check_tid(id, exc=StorageTransactionError)
+        self._check_tid(id, exc=StorageTransactionError)
         if self.locked:
             return self._transactionalUndo(trans_id)
         else:
             return self._wait(lambda: self._transactionalUndo(trans_id))
 
     def _tpc_begin(self, txn, tid, status):
-        self.locked = 1
+        self._lock()
         self.storage.tpc_begin(txn, tid, status)
-        self.timeout.begin(self)
 
     def _store(self, oid, serial, data, version):
         try:
@@ -415,6 +447,8 @@
         except (SystemExit, KeyboardInterrupt):
             raise
         except Exception, err:
+            if isinstance(err, ConflictError):
+                self.stats.conflicts += 1
             if not isinstance(err, TransactionError):
                 # Unexpected errors are logged and passed to the client
                 exc_info = sys.exc_info()
@@ -436,6 +470,8 @@
         else:
             if serial != "\0\0\0\0\0\0\0\0":
                 self.invalidated.append((oid, version))
+        if newserial == ResolvedSerial:
+            self.stats.conflicts_resolved += 1
         self.serials.append((oid, newserial))
 
     def _vote(self):
@@ -543,7 +579,8 @@
 
     def __init__(self, addr, storages, read_only=0,
                  invalidation_queue_size=100,
-                 transaction_timeout=None):
+                 transaction_timeout=None,
+                 monitor_address=None):
         """StorageServer constructor.
 
         This is typically invoked from the start.py script.
@@ -580,6 +617,11 @@
             a transaction to commit after acquiring the storage lock.
             If the transaction takes too long, the client connection
             will be closed and the transaction aborted.
+
+        monitor_address -- The address at which the monitor server
+            should listen.  If specified, a monitor server is started.
+            The monitor server provides server statistics in a simple
+            text format. 
         """
 
         self.addr = addr
@@ -599,10 +641,11 @@
         self.invq_bound = invalidation_queue_size
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
-                                               factory=self.new_connection,
-                                               reuse_addr=1)
+                                               factory=self.new_connection)
+        self.stats = {}
         self.timeouts = {}
         for name in self.storages.keys():
+            self.stats[name] = StorageStats()
             if transaction_timeout is None:
                 # An object with no-op methods
                 timeout = StubTimeoutThread()
@@ -610,6 +653,10 @@
                 timeout = TimeoutThread(transaction_timeout)
                 timeout.start()
             self.timeouts[name] = timeout
+        if monitor_address:
+            self.monitor = StatsServer(monitor_address, self.stats)
+        else:
+            self.monitor = None
 
     def new_connection(self, sock, addr):
         """Internal: factory to create a new connection.
@@ -633,13 +680,15 @@
         is needed to handle invalidation.  This function updates this
         dictionary.
 
-        Returns the timeout object for the appropriate storage.
+        Returns the timeout and stats objects for the appropriate storage.
         """
         l = self.connections.get(storage_id)
         if l is None:
             l = self.connections[storage_id] = []
         l.append(conn)
-        return self.timeouts[storage_id]
+        stats = self.stats[storage_id]
+        stats.clients += 1
+        return self.timeouts[storage_id], stats
 
     def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
@@ -707,6 +756,8 @@
         for timeout in self.timeouts.values():
             timeout.stop()
         self.dispatcher.close()
+        if self.monitor is not None:
+            self.monitor.close()
         for storage in self.storages.values():
             storage.close()
         # Force the asyncore mainloop to exit by hackery, i.e. close


=== ZODB3/ZEO/runsvr.py 1.26 => 1.27 ===
--- ZODB3/ZEO/runsvr.py:1.26	Thu Jan  9 15:40:22 2003
+++ ZODB3/ZEO/runsvr.py	Thu Jan  9 16:50:18 2003
@@ -22,6 +22,7 @@
                         (a PATH must contain at least one "/")
 -f/--filename FILENAME -- filename for FileStorage
 -h/--help -- print this usage message and exit
+-m/--monitor ADDRESS -- address of monitor server
 
 Unless -C is specified, -a and -f are required.
 """
@@ -147,43 +148,62 @@
         sys.stderr.write("For help, use %s -h\n" % self.progname)
         sys.exit(2)
 
+def parse_address(arg):
+    if "/" in arg:
+        family = socket.AF_UNIX
+        address = arg
+    else:
+        family = socket.AF_INET
+        if ":" in arg:
+            host, port = arg.split(":", 1)
+        else:
+            host = ""
+            port = arg
+        try:
+            port = int(port)
+        except: # int() can raise all sorts of errors
+            raise ValueError("invalid port number: %r" % port)
+        address = host, port
+    return family, address
 
 class ZEOOptions(Options):
 
     read_only = None
     transaction_timeout = None
     invalidation_queue_size = None
+    monitor_address = None
 
     family = None                       # set by -a; AF_UNIX or AF_INET
     address = None                      # set by -a; string or (host, port)
     storages = None                     # set by -f
 
-    _short_options = "a:C:f:h"
+    _short_options = "a:C:f:hm:"
     _long_options = [
         "address=",
         "configuration=",
         "filename=",
         "help",
+        "monitor=",
         ]
 
     def handle_option(self, opt, arg):
         # Alphabetical order please!
         if opt in ("-a", "--address"):
-            if "/" in arg:
-                self.family = socket.AF_UNIX
-                self.address = arg
+            try:
+                f, a = parse_address(arg)
+            except ValueError, err:
+                self.usage(str(err))
+            else:
+                self.family = f
+                self.address = a
+        elif opt in ("-m", "--monitor"):
+            try:
+                f, a = parse_address(arg)
+            except ValueError, err:
+                self.usage(str(err))
             else:
-                self.family = socket.AF_INET
-                if ":" in arg:
-                    host, port = arg.split(":", 1)
-                else:
-                    host = ""
-                    port = arg
-                try:
-                    port = int(port)
-                except: # int() can raise all sorts of errors
-                    self.usage("invalid port number: %r" % port)
-                self.address = (host, port)
+                self.monitor_family = f
+                self.monitor_address = a
         elif opt in ("-f", "--filename"):
             from ZODB.config import FileStorage
             class FSConfig:
@@ -238,7 +258,7 @@
 
         self.read_only = self.rootconf.read_only
         self.transaction_timeout = self.rootconf.transaction_timeout
-        self.invalidation_queue_size = self.rootconf.invalidation_queue_size
+        self.invalidation_queue_size = 100
 
     def load_logconf(self):
         # Get logging options from conf, unless overridden by environment
@@ -349,7 +369,8 @@
             self.storages,
             read_only=self.options.read_only,
             invalidation_queue_size=self.options.invalidation_queue_size,
-            transaction_timeout=self.options.transaction_timeout)
+            transaction_timeout=self.options.transaction_timeout,
+            monitor_address=self.options.monitor_address)
 
     def loop_forever(self):
         import ThreadedAsync.LoopCallback


=== ZODB3/ZEO/schema.xml 1.4 => 1.5 ===
--- ZODB3/ZEO/schema.xml:1.4	Thu Jan  9 13:25:29 2003
+++ ZODB3/ZEO/schema.xml	Thu Jan  9 16:50:18 2003
@@ -58,6 +58,14 @@
     </description>
   </key>
 
+  <key name="monitor-address" datatype="socket-address" required="no">
+    <description>
+      The address at which the monitor server should listen.  If
+      specified, a monitor server is started.  The monitor server
+      provides server statistics in a simple text format.
+    </description>
+  </key>
+
   <multisection name="+" type="storage"
                 attribute="storages"
                 required="yes">