[Zodb-checkins] CVS: ZODB3/Tools - zeoqueue.py:1.3

Barry Warsaw barry@wooz.org
Tue, 4 Feb 2003 19:06:17 -0500

Update of /cvs-repository/ZODB3/Tools
In directory cvs.zope.org:/tmp/cvs-serv15707

Modified Files:
Log Message:
Lots of changes, maybe not all of them good ;)

Added a module docstring as a usage() string.  Also add --help/-h.

Rewrote two of the regexps as verbose re's for readability <wink>.
Use named groups.

Use True/False where appropriate.

Txn: Added a `voters' attribute which is a list of clients seen doing
a vote.  This is cleared on abort or finish.  Thus if we see a
transaction with a non-empty voters list, we know that that
transaction was blocked and it was the first voter that grabbed the
lock.  This client (as an address) is displayed in the report.

call_*(): Pass the client address into the call, as grep'd out of the
zrpc-conn part of the line.  Only call_tpc_vote() really cares about

Most controversial: process the entire file, line by line.
Alternatives, if necessary:
    - go back to the original progressive skip backwards approach
    - add an incremental reader (recording file positions)
    - use as a filter

=== ZODB3/Tools/zeoqueue.py 1.2 => 1.3 ===
--- ZODB3/Tools/zeoqueue.py:1.2	Mon Dec 16 13:45:58 2002
+++ ZODB3/Tools/zeoqueue.py	Tue Feb  4 19:06:14 2003
@@ -1,29 +1,74 @@
 #! /usr/bin/env python
-"""Report on the number of currently waiting clients in the ZEO queue."""
+"""Report on the number of currently waiting clients in the ZEO queue.
+Usage: zeoqueue.py [options] logfile
+    -h / --help
+        Print this help text and exit.
+    -v
+        Verbose output
-import getopt
 import re
 import sys
 import time
+import getopt
-# pick arbitrary buffer size that isn't too big
-BUFSIZE = 8 * 1024 * 1024
-rx_time = re.compile('(\d\d\d\d-\d\d-\d\d)T(\d\d:\d\d:\d\d)')
+    True, False
+except NameError:
+    True = 1
+    False = 0
+tcre = re.compile(r"""
+    (?P<ymd>
+     \d{4}-      # year
+     \d{2}-      # month
+     \d{2})      # day
+    T            # separator
+    (?P<hms>
+     \d{2}:      # hour
+     \d{2}:      # minute
+     \d{2})      # second
+     """, re.VERBOSE)
+ccre = re.compile(r"""
+    zrpc-conn:(?P<addr>\d+.\d+.\d+.\d+:\d+)\s+
+    calling\s+
+    (?P<method>
+     \w+)        # the method
+    \(           # args open paren
+      \'         # string quote start
+        (?P<tid>
+         \S+)    # first argument -- usually the tid
+      \'         # end of string
+    (?P<rest>
+     .*)         # rest of line
+    """, re.VERBOSE)
+wcre = re.compile(r"""Clients waiting: (?P<num>\d+)""")
 def parse_time(line):
     """Return the time portion of a zLOG line in seconds or None."""
-    mo = rx_time.match(line)
+    mo = tcre.match(line)
     if mo is None:
         return None
-    date, time_ = mo.group(1, 2)
+    date, time_ = mo.group('ymd', 'hms')
     date_l = [int(elt) for elt in date.split('-')]
     time_l = [int(elt) for elt in time_.split(':')]
     return int(time.mktime(date_l + time_l + [0, 0, 0]))
 class Txn:
     """Track status of single transaction."""
     def __init__(self, tid):
         self.tid = tid
         self.hint = None
@@ -31,13 +76,16 @@
         self.vote = None
         self.abort = None
         self.finish = None
+        self.voters = []
     def isactive(self):
         if self.begin and not (self.abort or self.finish):
-            return 1
+            return True
-            return 0
+            return False
 class Status:
     """Track status of ZEO server by replaying log records.
@@ -46,6 +94,7 @@
     - The last committed transaction.
     - The last committed or aborted transaction.
     - The last transaction that got the lock but didn't finish.
+    - The client address doing the first vote of a transaction.
     - The number of currently active transactions.
     - The number of reported queued transactions.
     - Client restarts.
@@ -78,6 +127,7 @@
     def reset(self):
         self.commit = None
         self.commit_or_abort = None
+        self.last_unfinished = None
         self.n_active = 0
         self.n_blocked = 0
         self.n_conns = 0
@@ -88,88 +138,58 @@
         # The status report will always be complete if we encounter an
         # explicit restart.
         if self.t_restart is not None:
-            return 1
+            return True
         # If we haven't seen a restart, assume that seeing a finished
         # transaction is good enough.
         return self.commit is not None
-    def report(self):
-        print "Blocked transactions:", self.n_blocked
-        if not VERBOSE:
-            return
-        if self.t_restart:
-            print "Server started:", time.ctime(self.t_restart)
-        if self.commit is not None:
-            t = self.commit_or_abort.finish
-            if t is None:
-                t = self.commit_or_abort.abort
-            print "Last finished transaction:", time.ctime(t)
-        # the blocked transaction should be the first one that calls vote
-        L = [(txn.begin, txn) for txn in self.txns.values()]
-        L.sort()
-        for x, txn in L:
-            if txn.isactive():
-                began = txn.begin
-                print "Blocked transaction began at:", time.ctime(began)
-                print "Hint:", txn.hint
-                print "Idle time: %d sec" % int(time.time() - began)
-                break
     def process(self, line):
         if line.find("calling") != -1:
         elif line.find("connect") != -1:
+        # test for "locked" because word may start with "B" or "b"
         elif line.find("locked") != -1:
-            # test for "locked" because word may start with "B" or "b"
         elif line.find("Starting") != -1:
-    rx_call = re.compile("calling (\w+)\(\'(\S+)\'(.*)")
     def process_call(self, line):
-        mo = self.rx_call.search(line)
+        mo = ccre.search(line)
         if mo is None:
-        called_method = mo.group(1)
+        called_method = mo.group('method')
         # XXX exit earlier if we've got zeoLoad, because it's the most
         # frequently called method and we don't use it.
         if called_method == "zeoLoad":
         t = parse_time(line)
         meth = getattr(self, "call_%s" % called_method, None)
         if meth is None:
-        tid = mo.group(2)
-        rest = mo.group(3)
-        meth(t, tid, rest)
+        client = mo.group('addr')
+        tid = mo.group('tid')
+        rest = mo.group('rest')
+        meth(t, client, tid, rest)
     def process_connect(self, line):
-    rx_waiting = re.compile("Clients waiting: (\d+)")
     def process_block(self, line):
-        mo = self.rx_waiting.search(line)
+        mo = wcre.search(line)
         if mo is None:
             # assume that this was a restart message for the last blocked
             # transaction.
             self.n_blocked = 0
-            self.n_blocked = int(mo.group(1))
+            self.n_blocked = int(mo.group('num'))
     def process_start(self, line):
         if line.find("Starting ZEO server") != -1:
             self.t_restart = parse_time(line)
-    def call_tpc_begin(self, t, tid, rest):
+    def call_tpc_begin(self, t, client, tid, rest):
         txn = Txn(tid)
         txn.begin = t
         if rest[0] == ',':
@@ -180,22 +200,24 @@
         txn.hint = rest
         self.txns[tid] = txn
         self.n_active += 1
+        self.last_unfinished = txn
-    def call_vote(self, t, tid, rest):
+    def call_vote(self, t, client, tid, rest):
         txn = self.txns.get(tid)
         if txn is None:
             print "Oops!"
             txn = self.txns[tid] = Txn(tid)
         txn.vote = t
+        txn.voters.append(client)
-    def call_tpc_abort(self, t, tid, rest):
+    def call_tpc_abort(self, t, client, tid, rest):
         txn = self.txns.get(tid)
         if txn is None:
             print "Oops!"
             txn = self.txns[tid] = Txn(tid)
         txn.abort = t
+        txn.voters = []
         self.n_active -= 1
         if self.commit_or_abort:
             # delete the old transaction
@@ -204,14 +226,14 @@
         self.commit_or_abort = txn
-    def call_tpc_finish(self, t, tid, rest):
+    def call_tpc_finish(self, t, client, tid, rest):
         txn = self.txns.get(tid)
         if txn is None:
             print "Oops!"
             txn = self.txns[tid] = Txn(tid)
         txn.finish = t
+        txn.voters = []
         self.n_active -= 1
         if self.commit:
             # delete the old transaction
@@ -226,38 +248,72 @@
         self.commit = self.commit_or_abort = txn
-def process_from(f, pos):
-    s = Status()
-    f.seek(-pos, 2)
-    f.readline()
-    for line in f.readlines(BUFSIZE):
-        s.process(line)
-    return s
+    def report(self):
+        print "Blocked transactions:", self.n_blocked
+        if not VERBOSE:
+            return
+        if self.t_restart:
+            print "Server started:", time.ctime(self.t_restart)
+        if self.commit is not None:
+            t = self.commit_or_abort.finish
+            if t is None:
+                t = self.commit_or_abort.abort
+            print "Last finished transaction:", time.ctime(t)
+        # the blocked transaction should be the first one that calls vote
+        L = [(txn.begin, txn) for txn in self.txns.values()]
+        L.sort()
+        for x, txn in L:
+            if txn.isactive():
+                began = txn.begin
+                if txn.voters:
+                    print "Blocked client (first vote):", txn.voters[0]
+                print "Blocked transaction began at:", time.ctime(began)
+                print "Hint:", txn.hint
+                print "Idle time: %d sec" % int(time.time() - began)
+                break
+def usage(code, msg=''):
+    print >> sys.stderr, __doc__
+    if msg:
+        print >> sys.stderr, msg
+    sys.exit(code)
 def main():
     global VERBOSE
     VERBOSE = 0
-    opts, args = getopt.getopt(sys.argv[1:], 'v')
-    for k, v in opts:
-        if k == '-v':
+    try:
+        opts, args = getopt.getopt(sys.argv[1:], 'vh', ['help'])
+    except getopt.error, msg:
+        usage(1, msg)
+    for opt, arg in opts:
+        if opt in ('-h', '--help'):
+            usage(0)
+        elif opt == '-v':
             VERBOSE += 1
+    if not args:
+        usage(1, 'logfile is required')
+    if len(args) > 1:
+        usage(1, 'too many arguments: %s' % COMMASPACE.join(args))
     path = args[0]
     f = open(path, "rb")
-    # Start at pos bytes from the end of the file and read forwards.
-    # If we read enough log data to have a complete snapshot of the
-    # server state, stop and print a report.  If not, move twice as
-    # far from the end of the file and repeat.
-    pos = 16 * 1024
-    while 1:
-        s = process_from(f, pos)
-        if s.iscomplete():
+    s = Status()
+    while True:
+        line = f.readline()
+        if not line:
-        pos *= 2
+        s.process(line)
 if __name__ == "__main__":