[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ Simplified serverchannelbase, hopefully fixing intermittent bugs.

Shane Hathaway shane at zope.com
Fri Sep 3 04:16:56 EDT 2004


Log message for revision 27442:
  Simplified serverchannelbase, hopefully fixing intermittent bugs.
  
  The FTP server needed a way to queue a task in the channel.  It used 
  start_task, but start_task had no way to queue tasks correctly if tasks 
  already happened to be running.  So an assertion failure resulted 
  occasionally.
  
  Now, there is a queue of tasks rather than a queue of requests.  
  Anything that needs to can send a task to a channel.  The task will be 
  executed in synchronous mode.
  
  Also, the basic request parsing is now done in the asyncore main thread.  
  When SimultaneousModeChannel was dropped, it became impossible to receive 
  data in application threads anyway.
  
  


Changed:
  U   Zope3/trunk/src/zope/server/ftp/server.py
  U   Zope3/trunk/src/zope/server/http/httptask.py
  U   Zope3/trunk/src/zope/server/interfaces/__init__.py
  U   Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py
  U   Zope3/trunk/src/zope/server/linereceiver/linetask.py
  U   Zope3/trunk/src/zope/server/serverchannelbase.py


-=-
Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py	2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/ftp/server.py	2004-09-03 08:16:55 UTC (rev 27442)
@@ -751,7 +751,7 @@
         task = FinishedRecvTask(c, self.inbuf, self.finish_args)
         self.complete_transfer = 1
         self.close()
-        c.start_task(task)
+        c.queue_task(task)
 
     def close(self, *reply_args):
         try:
@@ -791,14 +791,13 @@
                 if c.adj.log_socket_errors:
                     raise
         finally:
-            c.end_task(close_on_finish)
+            if close_on_finish:
+                c.close_when_done()
 
-
     def cancel(self):
         'See ITask'
         self.control_channel.close_when_done()
 
-
     def defer(self):
         'See ITask'
         pass

Modified: Zope3/trunk/src/zope/server/http/httptask.py
===================================================================
--- Zope3/trunk/src/zope/server/http/httptask.py	2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/http/httptask.py	2004-09-03 08:16:55 UTC (rev 27442)
@@ -76,7 +76,8 @@
                 if self.channel.adj.log_socket_errors:
                     raise
         finally:
-            self.channel.end_task(self.close_on_finish)
+            if self.close_on_finish:
+                self.channel.close_when_done()
 
     def cancel(self):
         """See zope.server.interfaces.ITask"""

Modified: Zope3/trunk/src/zope/server/interfaces/__init__.py
===================================================================
--- Zope3/trunk/src/zope/server/interfaces/__init__.py	2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/interfaces/__init__.py	2004-09-03 08:16:55 UTC (rev 27442)
@@ -280,30 +280,11 @@
     task_class = Attribute("""Specifies the ITask class to be used for
                            generating tasks.""")
 
-    active_channels = Attribute("Class-specific channel tracker")
-    next_channel_cleanup = Attribute("Class-specific cleanup time")
-
-    proto_request = Attribute("A request parser instance")
-    ready_requests = Attribute("A list of requests to be processed.")
-    last_activity = Attribute("Time of last activity")
-    running_tasks = Attribute("boolean")
-
-
-    def queue_request(self, req):
-        """Queues a request to be processed in sequence by a task.
+    def queue_task(task):
+        """Queues a channel-related task to be processed in sequence.
         """
 
-    def end_task(self, close):
-        """Called at the end of a task, may launch another task.
-        """
 
-    def create_task(self, req):
-        """Creates a new task and queues it for execution.
-
-        The task may get executed in another thread.
-        """
-
-
 class IDispatcher(ISocket, IDispatcherEventHandler, IDispatcherLogging):
     """The dispatcher is the most low-level component of a server.
 

Modified: Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py
===================================================================
--- Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py	2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/linereceiver/lineserverchannel.py	2004-09-03 08:16:55 UTC (rev 27442)
@@ -65,7 +65,7 @@
         }
 
 
-    def process_request(self, command):
+    def handle_request(self, command):
         """Processes a command.
 
         Some commands use an alternate thread.
@@ -79,7 +79,8 @@
 
         elif method in self.thread_commands:
             # Process in another thread.
-            return self.task_class(self, command, method)
+            task = self.task_class(self, command, method)
+            self.queue_task(task)
 
         elif hasattr(self, method):
             try:
@@ -88,7 +89,6 @@
                 self.exception()
         else:
             self.reply(self.unknown_reply, cmd.upper())
-        return None
 
 
     def reply(self, code, args=(), flush=1):

Modified: Zope3/trunk/src/zope/server/linereceiver/linetask.py
===================================================================
--- Zope3/trunk/src/zope/server/linereceiver/linetask.py	2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/linereceiver/linetask.py	2004-09-03 08:16:55 UTC (rev 27442)
@@ -49,7 +49,8 @@
             except:
                 self.channel.exception()
         finally:
-            self.channel.end_task(self.close_on_finish)
+            if self.close_on_finish:
+                self.channel.close_when_done()
 
     def cancel(self):
         'See ITask'

Modified: Zope3/trunk/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/trunk/src/zope/server/serverchannelbase.py	2004-09-03 08:07:01 UTC (rev 27441)
+++ Zope3/trunk/src/zope/server/serverchannelbase.py	2004-09-03 08:16:55 UTC (rev 27442)
@@ -26,16 +26,16 @@
 from zope.interface import implements
 
 from zope.server.dualmodechannel import DualModeChannel
-from zope.server.interfaces import IServerChannel
+from zope.server.interfaces import IServerChannel, ITask
 
-# Synchronize access to the "running_tasks" attributes.
-running_lock = allocate_lock()
+# task_lock is useful for synchronizing access to task-related attributes.
+task_lock = allocate_lock()
 
 
 class ServerChannelBase(DualModeChannel, object):
     """Base class for a high-performance, mixed-mode server-side channel."""
 
-    implements(IServerChannel)
+    implements(IServerChannel, ITask)
 
     # See zope.server.interfaces.IServerChannel
     parser_class = None       # Subclasses must provide a parser class
@@ -43,12 +43,10 @@
 
     active_channels = {}        # Class-specific channel tracker
     next_channel_cleanup = [0]  # Class-specific cleanup time
-
     proto_request = None      # A request parser instance
-    ready_requests = None     # A list
-    # ready_requests must always be empty when not running tasks.
     last_activity = 0         # Time of last activity
-    running_tasks = 0         # boolean: true when any task is being executed
+    tasks = None  # List of channel-related tasks to execute
+    running_tasks = False  # True when another thread is running tasks
 
     #
     # ASYNCHRONOUS METHODS (including __init__)
@@ -115,8 +113,8 @@
     def received(self, data):
         """See async.dispatcher
 
-        Receive input asynchronously and send requests to
-        receivedCompleteRequest().
+        Receives input asynchronously and send requests to
+        handle_request().
         """
         preq = self.proto_request
         while data:
@@ -125,58 +123,25 @@
             n = preq.received(data)
             if preq.completed:
                 # The request is ready to use.
+                self.proto_request = None
                 if not preq.empty:
-                    self.receivedCompleteRequest(preq)
+                    self.handle_request(preq)
                 preq = None
-                self.proto_request = None
             else:
                 self.proto_request = preq
             if n >= len(data):
                 break
             data = data[n:]
 
-    def receivedCompleteRequest(self, req):
-        """See async.dispatcher
+    def handle_request(self, req):
+        """Creates and queues a task for processing a request.
 
-        If there are tasks running or requests on hold, queue
-        the request, otherwise execute it.
+        Subclasses may override this method to handle some requests
+        immediately in the main async thread.
         """
-        do_now = 0
-        running_lock.acquire()
-        try:
-            if self.running_tasks:
-                # A task thread is working.  It will read from the queue
-                # when it is finished.
-                rr = self.ready_requests
-                if rr is None:
-                    rr = []
-                    self.ready_requests = rr
-                rr.append(req)
-            else:
-                # Do it now.
-                do_now = 1
-        finally:
-            running_lock.release()
-        if do_now:
-            task = self.process_request(req)
-            if task is not None:
-                self.start_task(task)
+        task = self.task_class(self, req)
+        self.queue_task(task)
 
-    def start_task(self, task):
-        """See async.dispatcher
-
-        Starts the given task.
-
-        *** For thread safety, this should only be called from the main
-        (async) thread. ***"""
-        if self.running_tasks:
-            # Can't start while another task is running!
-            # Otherwise two threads would work on the queue at the same time.
-            raise RuntimeError, 'Already executing tasks'
-        self.running_tasks = 1
-        self.set_sync()
-        self.server.addTask(task)
-
     def handle_error(self):
         """See async.dispatcher
 
@@ -199,52 +164,69 @@
             self.close()
 
     #
-    # SYNCHRONOUS METHODS
+    # BOTH MODES
     #
 
-    def end_task(self, close):
-        """Called at the end of a task and may launch another task.
-        """
-        if close:
-            # Note that self.running_tasks is left on, which has the
-            # side effect of preventing further requests from being
-            # serviced even if more appear.  A good thing.
-            self.close_when_done()
-            return
-        # Process requests held in the queue, if any.
-        while 1:
-            req = None
-            running_lock.acquire()
+    def queue_task(self, task):
+        """Queue a channel-related task to be executed in another thread."""
+        start = False
+        task_lock.acquire()
+        try:
+            if self.tasks is None:
+                self.tasks = []
+            self.tasks.append(task)
+            if not self.running_tasks:
+                self.running_tasks = True
+                start = True
+        finally:
+            task_lock.release()
+        if start:
+            self.set_sync()
+            self.server.addTask(self)
+
+    #
+    # ITask implementation.  Delegates to the queued tasks.
+    #
+
+    def service(self):
+        """Execute all pending tasks"""
+        while True:
+            task = None
+            task_lock.acquire()
             try:
-                rr = self.ready_requests
-                if rr:
-                    req = rr.pop(0)
+                if self.tasks:
+                    task = self.tasks.pop(0)
                 else:
-                    # No requests to process.
-                    self.running_tasks = 0
+                    # No more tasks
+                    self.running_tasks = False
+                    self.set_async()
+                    break
             finally:
-                running_lock.release()
+                task_lock.release()
+            try:
+                task.service()
+            except:
+                # propagate the exception, but keep executing tasks
+                self.server.addTask(self)
+                raise
 
-            if req is not None:
-                task = self.process_request(req)
-                if task is not None:
-                    # Add the new task.  It will service the queue.
-                    self.server.addTask(task)
-                    break
-                # else check the queue again.
+    def cancel(self):
+        task_lock.acquire()
+        try:
+            if self.tasks:
+                old = self.tasks[:]
             else:
-                # Idle -- Wait for another request on this connection.
-                self.set_async()
-                break
+                old = []
+            self.tasks = []
+            self.running_tasks = False
+        finally:
+            task_lock.release()
+        try:
+            for task in old:
+                task.cancel()
+        finally:
+            self.set_async()
 
-    #
-    # BOTH MODES
-    #
+    def defer(self):
+        pass
 
-    def process_request(self, req):
-        """Returns a task to execute or None if the request is quick and
-        can be processed in the main thread.
-
-        Override to handle some requests in the main thread.
-        """
-        return self.task_class(self, req)



More information about the Zope3-Checkins mailing list