[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server - TaskThreads.py:1.1.2.4

Shane Hathaway shane@digicool.com
Tue, 27 Nov 2001 17:33:13 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server
In directory cvs.zope.org:/tmp/cvs-serv21173

Modified Files:
      Tag: Zope-3x-branch
	TaskThreads.py 
Log Message:
Better thread handling: kill the threads that aren't busy, and kill them
right away.


=== Zope3/lib/python/Zope/Server/TaskThreads.py 1.1.2.3 => 1.1.2.4 ===
 from Queue import Queue, Empty
 from thread import allocate_lock, start_new_thread
+from time import time, sleep
 
 try:
     from zLOG import LOG, ERROR
 except ImportError:
     LOG = None
+    ERROR = None
 
 
 class ITask:  # Interface
@@ -49,17 +51,19 @@
 
     def handlerThread(self, thread_no):
         threads = self.threads
-        while threads.has_key(thread_no):
-            task = self.queue.get()
-            try:
-                task.service()
-            except:
-                if LOG is None:
-                    import traceback
-                    traceback.print_exc()
-                else:
-                    LOG('ThreadedTaskDispatcher', ERROR,
-                        'Exception during task', error=sys.exc_info())
+        try:
+            while threads.get(thread_no):
+                task = self.queue.get()
+                if task is None:
+                    # Special value: kill this thread.
+                    break
+                try:
+                    task.service()
+                except:
+                    self.error('Exception during task', sys.exc_info())
+        finally:
+            try: del threads[thread_no]
+            except KeyError: pass
 
     def setThreadCount(self, count):
         mlock = self.thread_mgmt_lock
@@ -73,25 +77,40 @@
                 threads[thread_no] = 1
                 start_new_thread(self.handlerThread, (thread_no,))
                 thread_no = thread_no + 1
-            while (len(threads) > count):
-                if count == 0:
-                    threads.clear()
-                else:
-                    thread_no = threads.keys()[0]
-                    del threads[thread_no]
+            if len(threads) > count:
+                to_kill = len(threads) - count
+                for n in range(to_kill):
+                    self.queue.put(None)
         finally:
             mlock.release()
 
     def addTask(self, task):
+        if task is None:
+            raise ValueError, "No task passed to addTask()."
         try:
             task.defer()
-            self.queue.put_nowait(task)
+            self.queue.put(task)
         except:
             task.cancel()
             raise
 
+    def error(self, msg, exc=None):
+        if LOG is not None:
+            LOG('ThreadedTaskDispatcher', ERROR, msg, error=exc)
+        else:
+            sys.stderr.write(msg + '\n')
+            if exc is not None:
+                import traceback
+                traceback.print_exception(exc[0], exc[1], exc[2])
+
     def shutdown(self, cancel_pending=1):
         self.setThreadCount(0)
+        threads = self.threads
+        timeout = time() + 5  # Up to 5 seconds.
+        while threads:
+            if time() > timeout:
+                self.error("%d zombie threads still exist" % len(threads))
+            sleep(0.1)
         if cancel_pending:
             try:
                 while 1: