[Zope3-checkins] SVN: Zope3/trunk/src/zope/server/ Another go at fixing the asyncore errors.

Shane Hathaway shane at zope.com
Sun Sep 12 02:02:39 EDT 2004


Log message for revision 27500:
  Another go at fixing the asyncore errors.
  
  Strategy:
  1. Move work into the main thread, since it's more predictable than 
  threads.
  2. Work around asyncore idiosyncracies.  For example, I just found out 
  that if you close() twice, you might remove some other socket from the 
  map.  Wow, dude.
  3. Run the unit tests repeatedly, although they still pass 100% of the 
  time for me.  Bummer.
  
  


Changed:
  U   Zope3/trunk/src/zope/server/dualmodechannel.py
  U   Zope3/trunk/src/zope/server/ftp/server.py
  U   Zope3/trunk/src/zope/server/serverchannelbase.py


-=-
Modified: Zope3/trunk/src/zope/server/dualmodechannel.py
===================================================================
--- Zope3/trunk/src/zope/server/dualmodechannel.py	2004-09-11 22:24:02 UTC (rev 27499)
+++ Zope3/trunk/src/zope/server/dualmodechannel.py	2004-09-12 06:02:38 UTC (rev 27500)
@@ -38,11 +38,11 @@
     the main loop.
     """
 
-    # will_close is set to 1 to close the socket.
-    will_close = 0
+    # will_close is set to True to close the socket.
+    will_close = False
 
     # boolean: async or sync mode
-    async_mode = 1
+    async_mode = True
 
     def __init__(self, conn, addr, adj=None):
         self.addr = addr
@@ -110,13 +110,13 @@
 
         The main thread will stop calling received().
         """
-        self.async_mode = 0
+        self.async_mode = False
 
     #
     # SYNCHRONOUS METHODS
     #
 
-    def flush(self, block=1):
+    def flush(self, block=True):
         """Sends pending data.
 
         If block is set, this pauses the application.  If it is turned
@@ -127,13 +127,13 @@
             while self._flush_some():
                 pass
             return
-        blocked = 0
+        blocked = False
         try:
             while self.outbuf:
                 # We propagate errors to the application on purpose.
                 if not blocked:
                     self.socket.setblocking(1)
-                    blocked = 1
+                    blocked = True
                 self._flush_some()
         finally:
             if blocked:
@@ -144,7 +144,7 @@
 
         The main thread will begin calling received() again.
         """
-        self.async_mode = 1
+        self.async_mode = True
         self.pull_trigger()
 
     #
@@ -183,11 +183,11 @@
         # Flush all possible.
         while self._flush_some():
             pass
-        self.will_close = 1
+        self.will_close = True
         if not self.async_mode:
             # For safety, don't close the socket until the
             # main thread calls handle_write().
-            self.async_mode = 1
+            self.async_mode = True
             self.pull_trigger()
 
     def close(self):
@@ -195,4 +195,5 @@
         # closed in a thread, the main loop can end up with a bad file
         # descriptor.
         assert self.async_mode
+        self.connected = False
         asyncore.dispatcher.close(self)

Modified: Zope3/trunk/src/zope/server/ftp/server.py
===================================================================
--- Zope3/trunk/src/zope/server/ftp/server.py	2004-09-11 22:24:02 UTC (rev 27499)
+++ Zope3/trunk/src/zope/server/ftp/server.py	2004-09-12 06:02:38 UTC (rev 27500)
@@ -29,7 +29,7 @@
 from zope.server.interfaces.ftp import IFTPCommandHandler
 from zope.server.linereceiver.lineserverchannel import LineServerChannel
 from zope.server.serverbase import ServerBase
-from zope.server.dualmodechannel import DualModeChannel
+from zope.server.dualmodechannel import DualModeChannel, the_trigger
 
 status_messages = {
     'OPEN_DATA_CONN'   : '150 Opening %s mode data connection for file list',
@@ -92,16 +92,17 @@
 
 
     # List of commands that are always available
-    special_commands = ('cmd_quit', 'cmd_type', 'cmd_noop', 'cmd_user',
-                        'cmd_pass')
+    special_commands = (
+        'cmd_quit', 'cmd_type', 'cmd_noop', 'cmd_user', 'cmd_pass')
 
     # These are the commands that are accessing the filesystem.
     # Since this could be also potentially a longer process, these commands
     # are also the ones that are executed in a different thread.
-    thread_commands = ('cmd_appe', 'cmd_cdup', 'cmd_cwd', 'cmd_dele',
-                       'cmd_list', 'cmd_nlst', 'cmd_mdtm', 'cmd_mkd',
-                       'cmd_pass', 'cmd_retr', 'cmd_rmd', 'cmd_rnfr',
-                       'cmd_rnto', 'cmd_size', 'cmd_stor', 'cmd_stru')
+    thread_commands = (
+        'cmd_appe', 'cmd_cdup', 'cmd_cwd', 'cmd_dele',
+        'cmd_list', 'cmd_nlst', 'cmd_mdtm', 'cmd_mkd',
+        'cmd_pass', 'cmd_retr', 'cmd_rmd', 'cmd_rnfr',
+        'cmd_rnto', 'cmd_size', 'cmd_stor', 'cmd_stru')
 
     # Define the status messages
     status_messages = status_messages
@@ -125,11 +126,10 @@
     def __init__(self, server, conn, addr, adj=None):
         super(FTPServerChannel, self).__init__(server, conn, addr, adj)
 
-        self.client_addr = (addr[0], 21)
+        self.port_addr = None  # The client's PORT address
+        self.passive_listener = None  # The PASV listener
+        self.client_dc = None  # The data connection
 
-        self.passive_acceptor = None
-        self.client_dc = None
-
         self.transfer_mode = 'a'  # Have to default to ASCII :-|
         self.passive_mode = 0
         self.cwd = '/'
@@ -145,14 +145,15 @@
         """Open the filesystem using the current credentials."""
         return self.server.fs_access.open(self.credentials)
 
+
     def cmd_abor(self, args):
         'See IFTPCommandHandler'
         assert self.async_mode
         self.reply('TRANSFER_ABORTED')
-        if self.client_dc is not None:
-            self.client_dc.reported = True
-            self.client_dc.close()
+        self.abortPassive()
+        self.abortData()
 
+
     def cmd_appe (self, args):
         'See IFTPCommandHandler'
         return self.cmd_stor(args, 'a')
@@ -229,8 +230,7 @@
             self.reply('ERR_NO_LIST', str(err))
             return
         ok_reply = ('OPEN_DATA_CONN', self.type_map[self.transfer_mode])
-        cdc = XmitChannel(self, ok_reply)
-        self.client_dc = cdc
+        cdc = RETRChannel(self, ok_reply)
         try:
             cdc.write(s)
             cdc.close_when_done()
@@ -256,7 +256,6 @@
 
         path = self._generatePath(path)
 
-
         if fs.type(path) == 'd' and not directory:
             if long:
                 file_list = map(ls, fs.ls(path))
@@ -342,11 +341,13 @@
 
     def cmd_pasv(self, args):
         'See IFTPCommandHandler'
-        pc = self.newPassiveAcceptor()
-        self.client_dc = None
-        port = pc.addr[1]
-        ip_addr = pc.control_channel.getsockname()[0]
-        self.reply('PASV_MODE_MSG', (','.join(ip_addr.split('.')),
+        assert self.async_mode
+        # Kill any existing passive listener first.
+        self.abortPassive()
+        local_addr = self.getsockname()[0]
+        self.passive_listener = PassiveListener(self, local_addr)
+        port = self.passive_listener.port
+        self.reply('PASV_MODE_MSG', (','.join(local_addr.split('.')),
                                      port/256,
                                      port%256 ) )
 
@@ -360,7 +361,7 @@
         # I'm assuming one for now...
         # TODO: we should (optionally) verify that the
         # ip number belongs to the client.  [wu-ftpd does this?]
-        self.client_addr = (ip, port)
+        self.port_addr = (ip, port)
         self.reply('SUCCESS_200', 'PORT')
 
 
@@ -392,9 +393,8 @@
             self.restart_position = 0
 
         ok_reply = 'OPEN_CONN', (self.type_map[self.transfer_mode], path)
-        cdc = XmitChannel(self, ok_reply)
-        self.client_dc = cdc
-        outstream = ApplicationXmitStream(cdc)
+        cdc = RETRChannel(self, ok_reply)
+        outstream = ApplicationOutputStream(cdc)
 
         try:
             fs.readfile(path, outstream, start)
@@ -484,14 +484,13 @@
             self.reply('ERR_OPEN_WRITE', "Can't write file")
             return
 
-        cdc = RecvChannel(self, (path, mode, start))
-        self.client_dc = cdc
+        cdc = STORChannel(self, (path, mode, start))
+        self.syncConnectData(cdc)
         self.reply('OPEN_CONN', (self.type_map[self.transfer_mode], path))
-        self.connectDataChannel(cdc)
 
 
-    def finishedRecv(self, buffer, (path, mode, start)):
-        """Called by RecvChannel when the transfer is finished."""
+    def finishSTOR(self, buffer, (path, mode, start)):
+        """Called by STORChannel when the client has sent all data."""
         assert not self.async_mode
         try:
             infile = buffer.getfile()
@@ -530,10 +529,8 @@
         # if t not in ['a','e','i','l']:
         if t not in ['a','i','l']:
             self.reply('ERR_ARGS')
-
         elif t == 'l' and (len(args) > 2 and args[2] != '8'):
             self.reply('WRONG_BYTE_SIZE')
-
         else:
             self.transfer_mode = t
             self.reply('TYPE_SET_OK', self.type_map[t])
@@ -548,7 +545,6 @@
         else:
             self.reply('ERR_ARGS')
 
-    #
     ############################################################
 
     def _generatePath(self, args):
@@ -558,55 +554,61 @@
         path = posixpath.join(self.cwd, args)
         return posixpath.normpath(path)
 
-    def newPassiveAcceptor(self):
-        # ensure that only one of these exists at a time.
-        assert self.async_mode
-        if self.passive_acceptor is not None:
-            self.passive_acceptor.close()
-            self.passive_acceptor = None
-        self.passive_acceptor = PassiveAcceptor(self)
-        return self.passive_acceptor
+    def syncConnectData(self, cdc):
+        """Calls asyncConnectData in the asynchronous thread."""
+        the_trigger.pull_trigger(lambda: self.asyncConnectData(cdc))
 
-    def connectDataChannel(self, cdc):
-        """Attempt to connect the data channel."""
-        pa = self.passive_acceptor
-        if pa:
-            # PASV mode.
-            if pa.ready:
-                # a connection has already been made.
-                conn, addr = pa.ready
-                cdc.set_socket (conn)
-                cdc.connected = 1
-                self.passive_acceptor = None
-            # else we're still waiting for a connect to the PASV port.
-            # FTP Explorer is known to do this.
-        else:
-            # not in PASV mode.
-            ip, port = self.client_addr
-            cdc.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-            if self.bind_local_minus_one:
-                cdc.bind(('', self.server.port - 1))
-            try:
-                cdc.connect((ip, port))
-            except socket.error:
-                self.reply('NO_DATA_CONN')
-                cdc.reported = True
-                cdc.close_when_done()
+    def asyncConnectData(self, cdc):
+        """Starts connecting the data channel.
 
+        This is a little complicated because the data connection might
+        be established already (in passive mode) or might be
+        established in the near future (in port or passive mode.)  If
+        the connection has already been established,
+        self.passive_listener already has a socket and is waiting for
+        a call to connectData().  If the connection has not been
+        established in passive mode, the passive listener will
+        remember the data channel and send it when it's ready.  In port
+        mode, this method tells the data connection to connect.
+        """
+        self.abortData()
+        self.client_dc = cdc
+        if self.passive_listener is not None:
+            # Connect via PASV
+            self.passive_listener.connectData(cdc)
+        if self.port_addr:
+            # Connect via PORT
+            a = self.port_addr
+            self.port_addr = None
+            cdc.connectPort(a)
+
+    def connectedPassive(self):
+        """Accepted a passive connection."""
+        self.passive_listener = None
+
+    def abortPassive(self):
+        """Close the passive listener."""
+        if self.passive_listener is not None:
+            self.passive_listener.abort()
+            self.passive_listener = None
+
+    def abortData(self):
+        """Close the data connection."""
+        if self.client_dc is not None:
+            self.client_dc.abort()
+            self.client_dc = None
+
     def closedData(self):
         self.client_dc = None
 
     def close(self):
+        # Make sure the passive listener and active client DC get closed.
+        self.abortPassive()
+        self.abortData()
         LineServerChannel.close(self)
-        # Make sure the client DC gets closed too.
-        cdc = self.client_dc
-        if cdc is not None:
-            self.client_dc = None
-            cdc.close()
 
 
 
-
 def ls(ls_info):
     """Formats a directory entry similarly to the 'ls' command.
     """
@@ -664,15 +666,15 @@
         )
 
 
-class PassiveAcceptor(asyncore.dispatcher):
+class PassiveListener(asyncore.dispatcher):
     """This socket accepts a data connection, used when the server has
        been placed in passive mode.  Although the RFC implies that we
-       ought to be able to use the same acceptor over and over again,
+       ought to be able to use the same listener over and over again,
        this presents a problem: how do we shut it off, so that we are
        accepting connections only when we expect them?  [we can't]
 
        wuftpd, and probably all the other servers, solve this by
-       allowing only one connection to hit this acceptor.  They then
+       allowing only one connection to hit this listener.  They then
        close it.  Any subsequent data-connection command will then try
        for the default port on the client side [which is of course
        never there].  So the 'always-send-PORT/PASV' behavior seems
@@ -681,51 +683,89 @@
        Another note: wuftpd will also be listening on the channel as
        soon as the PASV command is sent.  It does not wait for a data
        command first.
+       """
 
-       --- we need to queue up a particular behavior:
-       1) xmit : queue up producer[s]
-       2) recv : the file object
-
-       It would be nice if we could make both channels the same.
-       Hmmm.."""
-
-    ready = None
-
-    def __init__ (self, control_channel):
+    def __init__ (self, control_channel, local_addr):
         asyncore.dispatcher.__init__ (self)
         self.control_channel = control_channel
+        self.accepted = None  # The accepted socket address
+        self.client_dc = None  # The data connection to accept the socket
         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-        # bind to an address on the interface that the
-        # control connection is coming from.
-        self.bind((self.control_channel.getsockname()[0], 0))
-        self.addr = self.getsockname()
+        self.closed = False
+        # bind to an address on the interface where the
+        # control connection is connected.
+        self.bind((local_addr, 0))
+        self.port = self.getsockname()[1]
         self.listen(1)
 
     def log (self, *ignore):
         pass
 
+    def abort(self):
+        """Abort the passive listener."""
+        if not self.closed:
+            self.closed = True
+            self.close()
+        if self.accepted is not None:
+            self.accepted.close()
+
     def handle_accept (self):
-        conn, addr = self.accept()
-        conn.setblocking(0)
-        dc = self.control_channel.client_dc
-        if dc is not None:
-            dc.set_socket(conn)
-            dc.addr = addr
-            dc.connected = 1
-            self.control_channel.passive_acceptor = None
-        else:
-            self.ready = conn, addr
+        self.accepted, addr = self.accept()
+        self.accepted.setblocking(0)
+        self.closed = True
         self.close()
+        if self.client_dc is not None:
+            self.connectData(self.client_dc)
 
+    def connectData(self, cdc):
+        """Sends the connection to the data channel.
 
+        If the connection has not yet been made, sends the connection
+        when it becomes available.
+        """
+        if self.accepted is not None:
+            cdc.set_socket(self.accepted)
+            # Note that this method will be called twice, once by the
+            # control channel, and once by handle_accept, and the two
+            # calls may come in either order.  If handle_accept calls
+            # first, we don't want to call set_socket() on the data
+            # connection twice, so set self.accepted = None to keep a
+            # record that the data connection already has the socket.
+            self.accepted = None
+            self.control_channel.connectedPassive()
+        else:
+            self.client_dc = cdc
+
+
 class FTPDataChannel(DualModeChannel):
-    """Base class for FTP data connections"""
+    """Base class for FTP data connections.
+
+    Note that data channels are always in async mode.
+    """
     
     def __init__ (self, control_channel):
         self.control_channel = control_channel
         self.reported = False
+        self.closed = False
         DualModeChannel.__init__(self, None, None, control_channel.adj)
 
+    def connectPort(self, client_addr):
+        """Connect to a port on the client"""
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        #if bind_local_minus_one:
+        #    self.bind(('', self.control_channel.server.port - 1))
+        try:
+            self.sock.connect(self.client_addr)
+        except socket.error:
+            self.report('NO_DATA_CONN')
+
+    def abort(self):
+        """Abort the data connection without reporting."""
+        self.reported = True
+        if not self.closed:
+            self.closed = True
+            self.close()
+
     def report(self, *reply_args):
         """Reports the result of the data transfer."""
         self.reported = True
@@ -740,18 +780,18 @@
         """Notifies the control channel when the data connection closes."""
         c = self.control_channel
         try:
-            if c is not None and not self.reported:
+            if c is not None and c.connected and not self.reported:
                 self.reportDefault()
         finally:
             self.control_channel = None
             DualModeChannel.close(self)
             if c is not None:
                 c.closedData()
-    
 
-class RecvChannel(FTPDataChannel):
-    """FTP data receive channel"""
 
+class STORChannel(FTPDataChannel):
+    """Channel for uploading one file from client to server"""
+
     complete_transfer = 0
     _fileno = None  # provide a default for asyncore.dispatcher._fileno
 
@@ -774,7 +814,7 @@
     def handle_close (self):
         """Client closed, indicating EOF."""
         c = self.control_channel
-        task = FinishedRecvTask(c, self.inbuf, self.finish_args)
+        task = FinishSTORTask(c, self.inbuf, self.finish_args)
         self.complete_transfer = 1
         self.close()
         c.queue_task(task)
@@ -782,12 +822,16 @@
     def reportDefault(self):
         if not self.complete_transfer:
             self.report('TRANSFER_ABORTED')
-        # else the transfer completed and FinishedRecvTask will
-        # provide a complete reply through finishedRecv().
+        # else the transfer completed and FinishSTORTask will
+        # provide a complete reply through finishSTOR().
 
 
-class FinishedRecvTask(object):
+class FinishSTORTask(object):
+    """Calls control_channel.finishSTOR() in an application thread.
 
+    This task executes after the client has finished uploading.
+    """
+
     implements(ITask)
 
     def __init__(self, control_channel, inbuf, finish_args):
@@ -802,7 +846,7 @@
         c = self.control_channel
         try:
             try:
-                c.finishedRecv(self.inbuf, self.finish_args)
+                c.finishSTOR(self.inbuf, self.finish_args)
             except socket.error:
                 close_on_finish = 1
                 if c.adj.log_socket_errors:
@@ -820,22 +864,21 @@
         pass
 
 
-class XmitChannel(FTPDataChannel):
-    """FTP data send channel"""
+class RETRChannel(FTPDataChannel):
+    """Channel for downloading one file from server to client"""
 
     opened = 0
     _fileno = None  # provide a default for asyncore.dispatcher._fileno
 
     def __init__ (self, control_channel, ok_reply_args):
         self.ok_reply_args = ok_reply_args
-        self.set_sync()
         FTPDataChannel.__init__(self, control_channel)
 
     def _open(self):
         """Signal the client to open the connection."""
         self.opened = 1
         self.control_channel.reply(*self.ok_reply_args)
-        self.control_channel.connectDataChannel(self)
+        self.control_channel.asyncConnectData(self)
 
     def write(self, data):
         if self.control_channel is None:
@@ -875,14 +918,16 @@
             self.report('TRANSFER_ABORTED')
 
 
-class ApplicationXmitStream(object):
-    """Provide stream output, remapping close() to close_when_done().
+class ApplicationOutputStream(object):
+    """Provide stream output to RETRChannel.
+
+    Maps close() to close_when_done().
     """
 
-    def __init__(self, xmit_channel):
-        self.write = xmit_channel.write
-        self.flush = xmit_channel.flush
-        self.close = xmit_channel.close_when_done
+    def __init__(self, retr_channel):
+        self.write = retr_channel.write
+        self.flush = retr_channel.flush
+        self.close = retr_channel.close_when_done
 
 
 class FTPServer(ServerBase):

Modified: Zope3/trunk/src/zope/server/serverchannelbase.py
===================================================================
--- Zope3/trunk/src/zope/server/serverchannelbase.py	2004-09-11 22:24:02 UTC (rev 27499)
+++ Zope3/trunk/src/zope/server/serverchannelbase.py	2004-09-12 06:02:38 UTC (rev 27500)
@@ -211,6 +211,7 @@
                 raise
 
     def cancel(self):
+        """Cancels all pending tasks"""
         task_lock.acquire()
         try:
             if self.tasks:



More information about the Zope3-Checkins mailing list