[Zope-Checkins] CVS: ZODB3/zdaemon - zdaemon.py:1.8

Guido van Rossum guido@python.org
Mon, 11 Nov 2002 16:41:07 -0500


Update of /cvs-repository/ZODB3/zdaemon
In directory cvs.zope.org:/tmp/cvs-serv26742

Modified Files:
	zdaemon.py 
Log Message:
More refactoring: we now do no blocking actions on sockets (except
sends()).

Also cleaned up some error messages, using %r more.


=== ZODB3/zdaemon/zdaemon.py 1.7 => 1.8 ===
--- ZODB3/zdaemon/zdaemon.py:1.7	Mon Nov 11 15:17:45 2002
+++ ZODB3/zdaemon/zdaemon.py	Mon Nov 11 16:41:07 2002
@@ -52,13 +52,14 @@
 """
 XXX TO DO
 
-- Refactor the readcommand() to avoid blocking recv() calls.
-
 - Rethink client commands; maybe start/restart/stop make more sense?
   (Still need a way to send an arbitrary signal)
 
 - Do the governor without actual sleeps, using event scheduling etc.
 
+- True OO design -- use multiple classes rather than folding
+  everything into one class.
+
 - Add docstrings.
 
 """
@@ -115,7 +116,7 @@
                 try:
                     self.backofflimit = float(a)
                 except:
-                    self.usage("invalid number: %s" % repr(a))
+                    self.usage("invalid number: %r" % a)
             if o == "-c":
                 self.isclient += 1
             if o == "-d":
@@ -139,10 +140,10 @@
         try:
             sock.connect(self.sockname)
         except socket.error, msg:
-            self.errwrite("Can't connect to %s: %s\n" %
-                          (repr(self.sockname), str(msg)))
+            self.errwrite("Can't connect to %r: %s\n" % (self.sockname, msg))
             self.exit(1)
         sock.send(self.command + "\n")
+        sock.shutdown(1) # We're not writing any more
         lastdata = ""
         while 1:
             data = sock.recv(1000)
@@ -161,8 +162,7 @@
             self.usage("missing 'program' argument")
         self.filename = self.checkcommand(args[0])
         self.args = args # A list of strings like for execvp()
-        self.info("filename=%s; args=%s" %
-                  (repr(self.filename), repr(self.args)))
+        self.info("filename=%r; args=%r" % (self.filename, self.args))
 
     def checkcommand(self, command):
         if "/" in command:
@@ -170,7 +170,7 @@
             try:
                 st = os.stat(filename)
             except os.error:
-                self.usage("can't stat program %s" % repr(command))
+                self.usage("can't stat program %r" % command)
         else:
             path = self.getpath()
             for dir in path:
@@ -183,10 +183,10 @@
                 if mode & 0111:
                     break
             else:
-                self.usage("can't find program %s on PATH %s" %
-                           (repr(command), path))
+                self.usage("can't find program %r on PATH %s" %
+                           (command, path))
         if not os.access(filename, os.X_OK):
-            self.usage("no permission to run program %s" % repr(filename))
+            self.usage("no permission to run program %r" % filename)
         return filename
 
     def getpath(self):
@@ -213,24 +213,43 @@
             except os.error:
                 pass
 
-    controlsocket = None
+    mastersocket = None
+    commandsocket = None
 
     def opensocket(self):
+        self.checkopen()
         try:
             os.unlink(self.sockname)
         except os.error:
             pass
-        self.controlsocket = socket.socket(socket.AF_UNIX,
-                                           socket.SOCK_STREAM)
+        self.mastersocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
         oldumask = None
         try:
             oldumask = os.umask(077)
-            self.controlsocket.bind(self.sockname)
+            self.mastersocket.bind(self.sockname)
         finally:
             if oldumask is not None:
                 os.umask(oldumask)
-        self.controlsocket.listen(1)
-        self.controlsocket.setblocking(0)
+        self.mastersocket.listen(1)
+        self.mastersocket.setblocking(0)
+
+    def checkopen(self):
+        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        try:
+            s.connect(self.sockname)
+            s.send("status\n")
+            data = s.recv(1000)
+            s.close()
+        except socket.error:
+            pass
+        else:
+            if not data.endswith("\n"):
+                data += "\n"
+            msg = ("Another zdaemon is already up using socket %r:\n%s" %
+                   (self.sockname, data))
+            self.errwrite(msg)
+            self.panic(msg)
+            self.exit(1)
 
     def setsignals(self):
         signal.signal(signal.SIGTERM, self.sigexit)
@@ -239,12 +258,15 @@
         signal.signal(signal.SIGCHLD, self.sigchild)
 
     def sigexit(self, sig, frame):
-        self.info("daemon manager killed by signal %s(%d)" %
-                  (self.signame(sig), sig))
+        self.info("daemon manager killed by %s" % self.signame(sig))
         self.exit(1)
 
+    waitstatus = None
+
     def sigchild(self, sig, frame):
-        pass
+        pid, sts = os.waitpid(-1, os.WNOHANG)
+        if pid:
+            self.waitstatus = pid, sts
 
     def daemonize(self):
         pid = os.fork()
@@ -270,7 +292,11 @@
         while 1:
             if not self.appid:
                 self.forkandexec()
-            r, w, x = [self.controlsocket], [], []
+            if self.waitstatus:
+                self.reportstatus()
+            r, w, x = [self.mastersocket], [], []
+            if self.commandsocket:
+                r.append(self.commandsocket)
             timeout = 30
             try:
                 r, w, x = select.select(r, w, x, timeout)
@@ -278,42 +304,50 @@
                 if err[0] != errno.EINTR:
                     raise
                 r = w = x = []
-            wpid, wsts = os.waitpid(-1, os.WNOHANG)
-            if wpid != 0:
-                if wpid == self.appid:
-                    self.appid = 0
-                self.reportstatus(wpid, wsts)
-            if r:
-                self.readcommand()
-
-    conn = None
-
-    def readcommand(self):
-        try:
-            conn, addr = self.controlsocket.accept()
-            self.conn = conn
-            try:
-                data = conn.recv(1000)
-                if not data:
-                    self.sendreply("No input")
-                    return
-                line = data
-                while "\n" not in line:
-                    data = conn.recv(1000)
-                    if not data:
-                        self.sendreply("Input not terminated by newline")
-                        return
-                    line += data
-                self.docommand(line)
-            finally:
-                conn.close()
-            self.conn = None
-        except socket.error, msg:
-            self.problem("socket error: %s" % str(msg),
-                         error=sys.exc_info())
+            if self.waitstatus:
+                self.reportstatus()
+            if self.commandsocket and self.commandsocket in r:
+                try:
+                    self.dorecv()
+                except socket.error, msg:
+                    self.problem("socket.error in dorecv(): %s" % str(msg),
+                                 error=sys.exc_info())
+                    self.commandsocket = None
+            if self.mastersocket in r:
+                try:
+                    self.doaccept()
+                except socket.error, msg:
+                    self.problem("socket.error in doaccept(): %s" % str(msg),
+                                 error=sys.exc_info())
+                    self.commandsocket = None
+
+    def doaccept(self):
+        if self.commandsocket:
+            # Give up on previous command socket!
+            self.sendreply("Command superseded by new command")
+            self.commandsocket.close()
+            self.commandsocket = None
+        self.commandsocket, addr = self.mastersocket.accept()
+        self.commandbuffer = ""
+
+    def dorecv(self):
+        data = self.commandsocket.recv(1000)
+        if not data:
+            self.sendreply("Command not terminated by newline")
+            self.commandsocket.close()
+            self.commandsocket = None
+        self.commandbuffer += data
+        if "\n" in self.commandbuffer:
+            self.docommand()
+            self.commandsocket.close()
+            self.commandsocket = None
+        elif len(self.commandbuffer) > 10000:
+            self.sendreply("Command exceeds 10 KB")
+            self.commandsocket.close()
+            self.commandsocket = None
 
-    def docommand(self, line):
-        lines = line.split("\n")
+    def docommand(self):
+        lines = self.commandbuffer.split("\n")
         args = lines[0].split()
         if not args:
             self.sendreply("Empty command")
@@ -324,14 +358,14 @@
         if method:
             method(args)
         else:
-            self.sendreply("Unknown command %s" % (`args[0]`))
+            self.sendreply("Unknown command %r; 'help' for a list" % args[0])
 
     def cmd_kill(self, args):
         if args[1:]:
             try:
                 sig = int(args[1])
             except:
-                self.sendreply("Bad signal %s" % repr(args[1]))
+                self.sendreply("Bad signal %r" % args[1])
                 return
         else:
             sig = signal.SIGTERM
@@ -341,7 +375,7 @@
             try:
                 os.kill(self.appid, sig)
             except os.error, msg:
-                self.sendreply("Kill %d failed: %s" % (sig, str(msg)))
+                self.sendreply("Kill %d failed: %s" % (sig, msg))
             else:
                 self.sendreply("Signal %d sent" % sig)
 
@@ -353,8 +387,8 @@
         self.sendreply("status=%s\n" % status +
                        "manager=%d\n" % os.getpid() + 
                        "application=%d\n" % self.appid +
-                       "filename=%s\n" % repr(self.filename) +
-                       "args=%s\n" % repr(self.args))
+                       "filename=%r\n" % self.filename +
+                       "args=%r\n" % self.args)
 
     def cmd_help(self, args):
         self.sendreply(
@@ -366,16 +400,18 @@
             )
 
     def sendreply(self, msg):
-        if not msg.endswith("\n"):
-            msg = msg + "\n"
-        conn = self.conn
-        if hasattr(conn, "sendall"):
-            conn.sendall(msg)
-        else:
-            # This is quadratic, but msg is rarely more than 100 bytes :-)
-            while msg:
-                sent = conn.send(msg)
-                msg = msg[sent:]
+        try:
+            if not msg.endswith("\n"):
+                msg = msg + "\n"
+            if hasattr(self.commandsocket, "sendall"):
+                self.commandsocket.sendall(msg)
+            else:
+                # This is quadratic, but msg is rarely more than 100 bytes :-)
+                while msg:
+                    sent = self.commandsocket.send(msg)
+                    msg = msg[sent:]
+        except socket.error, msg:
+            self.problem("Error sending reply: %s" % str(msg))
 
     backoff = 0
     lasttime = None
@@ -391,7 +427,7 @@
                 if self.forever:
                     self.backoff = self.backofflimit
                 else:
-                    self.problem("restarting too often; quit")
+                    self.error("restarting too often; quit")
                     self.exit(1)
             self.info("sleep %s to avoid rapid restarts" % self.backoff)
             time.sleep(self.backoff)
@@ -417,12 +453,15 @@
             try:
                 os.execv(self.filename, self.args)
             except os.error, err:
-                self.panic("can't exec %s: %s" %
-                           (repr(self.filename), str(err)))
+                self.panic("can't exec %r: %s" % (self.filename, err))
         finally:
             os._exit(127)
 
-    def reportstatus(self, pid, sts):
+    def reportstatus(self):
+        pid, sts = self.waitstatus
+        self.waitstatus = None
+        if pid == self.appid:
+            self.appid = 0
         if os.WIFEXITED(sts):
             es = os.WEXITSTATUS(sts)
             msg = "pid %d: exit status %s" % (pid, es)
@@ -430,10 +469,10 @@
                 self.info(msg)
                 self.exit(0)
             elif es == 2:
-                self.problem(msg)
+                self.error(msg)
                 self.exit(es)
             else:
-                self.warning(msg)
+                self.problem(msg)
         elif os.WIFSIGNALED(sts):
             sig = os.WTERMSIG(sts)
             msg = ("pid %d: terminated by %s" % (pid, self.signame(sig)))
@@ -443,10 +482,10 @@
                 iscore = s & 0x80
             if iscore:
                 msg += " (core dumped)"
-            self.warning(msg)
+            self.problem(msg)
         else:
             msg = "pid %d: unknown termination cause 0x%04x" % (pid, sts)
-            self.warning(msg)
+            self.problem(msg)
 
     signames = None
 
@@ -473,7 +512,7 @@
     # Error handling
 
     def usage(self, msg):
-        self.problem(str(msg))
+        self.error(str(msg))
         self.errwrite("Error: %s\n" % str(msg))
         self.errwrite("For help, use zdaemon.py -h\n")
         self.exit(2)
@@ -498,10 +537,10 @@
     def info(self, msg):
         self.log(msg, zLOG.INFO)
 
-    def warning(self, msg):
-        self.log(msg, zLOG.WARNING)
+    def problem(self, msg):
+        self.log(msg, zLOG.PROBLEM)
 
-    def problem(self, msg, error=None):
+    def error(self, msg, error=None):
         self.log(msg, zLOG.ERROR, error)
 
     def panic(self, msg, error=None):