[Zope3-checkins] CVS: Zope3/src/zope/server/ftp/tests - __init__.py:1.2 test_ftpserver.py:1.2

Jim Fulton jim@zope.com
Wed, 25 Dec 2002 09:15:55 -0500


Update of /cvs-repository/Zope3/src/zope/server/ftp/tests
In directory cvs.zope.org:/tmp/cvs-serv20790/src/zope/server/ftp/tests

Added Files:
	__init__.py test_ftpserver.py 
Log Message:
Grand renaming:

- Renamed most files (especially python modules) to lower case.

- Moved views and interfaces into separate hierarchies within each
  project, where each top-level directory under the zope package
  is a separate project.

- Moved everything to src from lib/python.

  lib/python will eventually go away. I need access to the cvs
  repository to make this happen, however.

There are probably some bits that are broken. All tests pass
and zope runs, but I haven't tried everything. There are a number
of cleanups I'll work on tomorrow.



=== Zope3/src/zope/server/ftp/tests/__init__.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:15:55 2002
+++ Zope3/src/zope/server/ftp/tests/__init__.py	Wed Dec 25 09:15:24 2002
@@ -0,0 +1 @@
+#


=== Zope3/src/zope/server/ftp/tests/test_ftpserver.py 1.1 => 1.2 ===
--- /dev/null	Wed Dec 25 09:15:55 2002
+++ Zope3/src/zope/server/ftp/tests/test_ftpserver.py	Wed Dec 25 09:15:24 2002
@@ -0,0 +1,265 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+
+$Id$
+"""
+
+import asyncore
+import unittest
+import tempfile
+import os
+import socket
+import shutil
+import sys
+import traceback
+from types import StringType
+from StringIO import StringIO
+
+from threading import Thread, Event
+from zope.server.taskthreads import ThreadedTaskDispatcher
+from zope.server.ftp.ftpserver import FTPServer
+from zope.server.ftp.ftpstatusmessages import status_msgs
+from zope.server.adjustments import Adjustments
+from zope.server.interfaces import ITask
+
+from zope.server.vfs.osfilesystem import OSFileSystem
+from zope.server.ftp.testfilesystemaccess import TestFilesystemAccess
+
+from zope.server.tests.asyncerror import AsyncoreErrorHook
+
+import ftplib
+
+from time import sleep, time
+
+td = ThreadedTaskDispatcher()
+
+LOCALHOST = '127.0.0.1'
+SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
+CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.
+
+
+my_adj = Adjustments()
+
+
+def retrlines(ftpconn, cmd):
+    res = []
+    ftpconn.retrlines(cmd, res.append)
+    return ''.join(res)
+
+
+class Tests(unittest.TestCase, AsyncoreErrorHook):
+
+    def setUp(self):
+        td.setThreadCount(1)
+        self.orig_map_size = len(asyncore.socket_map)
+        self.hook_asyncore_error()
+
+        self.root_dir = tempfile.mktemp()
+        os.mkdir(self.root_dir)
+        os.mkdir(os.path.join(self.root_dir, 'test'))
+
+        fs = OSFileSystem(self.root_dir)
+        fs_access = TestFilesystemAccess(fs)
+
+        self.server = FTPServer(LOCALHOST, SERVER_PORT, fs_access,
+                                task_dispatcher=td, adj=my_adj)
+        if CONNECT_TO_PORT == 0:
+            self.port = self.server.socket.getsockname()[1]
+        else:
+            self.port = CONNECT_TO_PORT
+        self.run_loop = 1
+        self.counter = 0
+        self.thread_started = Event()
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+        self.thread_started.wait(10.0)
+        self.assert_(self.thread_started.isSet())
+
+    def tearDown(self):
+        self.run_loop = 0
+        self.thread.join()
+        td.shutdown()
+        self.server.close()
+        # Make sure all sockets get closed by asyncore normally.
+        timeout = time() + 2
+        while 1:
+            if len(asyncore.socket_map) == self.orig_map_size:
+                # Clean!
+                break
+            if time() >= timeout:
+                self.fail('Leaked a socket: %s' % `asyncore.socket_map`)
+                break
+            asyncore.poll(0.1)
+
+        self.unhook_asyncore_error()
+
+        if os.path.exists(self.root_dir):
+            shutil.rmtree(self.root_dir)
+
+    def loop(self):
+        self.thread_started.set()
+        import select
+        from errno import EBADF
+        while self.run_loop:
+            self.counter = self.counter + 1
+            # print 'loop', self.counter
+            try:
+                asyncore.poll(0.1)
+            except select.error, data:
+                if data[0] == EBADF:
+                    print "exception polling in loop(): ", data
+                else:
+                    raise
+
+    def getFTPConnection(self, login=1):
+        ftp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        ftp.connect((LOCALHOST, self.port))
+        result = ftp.recv(10000).split()[0]
+        self.assertEqual(result, '220')
+        if login:
+            ftp.send('USER foo\r\n')
+            self.assertEqual(ftp.recv(1024),
+                             status_msgs['PASS_REQUIRED'] +'\r\n')
+            ftp.send('PASS bar\r\n')
+            self.assertEqual(ftp.recv(1024),
+                             status_msgs['LOGIN_SUCCESS'] +'\r\n')
+
+        return ftp
+
+
+    def execute(self, commands, login=1):
+        ftp = self.getFTPConnection(login)
+
+        try:
+            if type(commands) is StringType:
+                commands = (commands,)
+
+            for command in commands:
+                ftp.send('%s\r\n' %command)
+                result = ftp.recv(10000)
+
+            self.failUnless(result.endswith('\r\n'))
+        finally:
+            ftp.close()
+        return result.split('\r\n')[0]
+
+
+    def testABOR(self):
+        self.assertEqual(self.execute('ABOR', 1),
+                         status_msgs['TRANSFER_ABORTED'])
+
+
+    def testCDUP(self):
+        self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
+                         status_msgs['SUCCESS_250'] %'CDUP')
+        self.assertEqual(self.execute('CDUP', 1),
+                         status_msgs['SUCCESS_250'] %'CDUP')
+
+
+    def testCWD(self):
+        self.assertEqual(self.execute('CWD test', 1),
+                         status_msgs['SUCCESS_250'] %'CWD')
+        self.assertEqual(self.execute('CWD foo', 1),
+                         status_msgs['ERR_NO_DIR'] %'/foo')
+
+
+    def testDELE(self):
+        open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
+        self.assertEqual(self.execute('DELE foo', 1),
+                         status_msgs['SUCCESS_250'] %'DELE')
+        res = self.execute('DELE bar', 1).split()[0]
+        self.assertEqual(res, '550')
+        self.assertEqual(self.execute('DELE', 1),
+                         status_msgs['ERR_ARGS'])
+
+
+    def XXXtestHELP(self):
+        # XXX This test doesn't work.  I think it is because execute()
+        #     doesn't read the whole reply.  The execeute() helper
+        #     function should be fixed, but that's for another day.
+        result = status_msgs['HELP_START'] + '\r\n'
+        result += 'Help goes here somewhen.\r\n'
+        result += status_msgs['HELP_END']
+
+        self.assertEqual(self.execute('HELP', 1), result)
+
+
+    def testLIST(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+            self.assertRaises(ftplib.Error, retrlines, conn, 'LIST /foo')
+            listing = retrlines(conn, 'LIST')
+            self.assert_(len(listing) > 0)
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
+
+
+    def testNOOP(self):
+        self.assertEqual(self.execute('NOOP', 0),
+                         status_msgs['SUCCESS_200'] %'NOOP')
+        self.assertEqual(self.execute('NOOP', 1),
+                         status_msgs['SUCCESS_200'] %'NOOP')
+
+
+    def testPASS(self):
+        self.assertEqual(self.execute('PASS', 0),
+                         status_msgs['LOGIN_MISMATCH'])
+        self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
+                         status_msgs['LOGIN_MISMATCH'])
+
+
+    def testQUIT(self):
+        self.assertEqual(self.execute('QUIT', 0),
+                         status_msgs['GOODBYE'])
+        self.assertEqual(self.execute('QUIT', 1),
+                         status_msgs['GOODBYE'])
+
+
+    def testSTOR(self):
+        conn = ftplib.FTP()
+        try:
+            conn.connect(LOCALHOST, self.port)
+            conn.login('foo', 'bar')
+            fp = StringIO('Speak softly')
+            # Can't overwrite directory
+            self.assertRaises(
+                ftplib.error_perm, conn.storbinary, 'STOR /test', fp)
+            fp = StringIO('Charity never faileth')
+            # Successful write
+            conn.storbinary('STOR /stuff', fp)
+        finally:
+            conn.close()
+        # Make sure no garbage was left behind.
+        self.testNOOP()
+
+
+    def testUSER(self):
+        self.assertEqual(self.execute('USER foo', 0),
+                         status_msgs['PASS_REQUIRED'])
+        self.assertEqual(self.execute('USER', 0),
+                         status_msgs['ERR_ARGS'])
+
+
+
+def test_suite():
+    loader = unittest.TestLoader()
+    return loader.loadTestsFromTestCase(Tests)
+
+if __name__=='__main__':
+    unittest.TextTestRunner().run( test_suite() )