[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/FTP/tests - __init__.py:1.1.2.1 testFTPServer.py:1.1.2.1 testPublisherServer.py:1.1.2.1

Stephan Richter srichter@cbu.edu
Sat, 6 Apr 2002 14:57:10 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server/FTP/tests
In directory cvs.zope.org:/tmp/cvs-serv2221/tests

Added Files:
      Tag: Zope3-Server-Branch
	__init__.py testFTPServer.py testPublisherServer.py 
Log Message:
Started writing FTP Server tests. Some are fairly easy, but othere are
hard, since FTP opens other connections as well.


=== Added File Zope3/lib/python/Zope/Server/FTP/tests/__init__.py ===
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.

""" Unit tests for Zope.Server """


=== Added File Zope3/lib/python/Zope/Server/FTP/tests/testFTPServer.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

$Id: testFTPServer.py,v 1.1.2.1 2002/04/06 19:57:09 srichter Exp $
"""

import unittest
import tempfile
import os
from asyncore import socket_map, poll
import socket
from types import StringType

from threading import Thread
from Zope.Server.TaskThreads import ThreadedTaskDispatcher
from Zope.Server.FTP.FTPServer import FTPServer
from Zope.Server.FTP.FTPStatusMessages import status_msgs
from Zope.Server.Adjustments import Adjustments
from Zope.Server.ITask import ITask

from Zope.Server.Authentication.DictionaryAuthentication import \
     DictionaryAuthentication

import ftplib

from time import sleep, time

td = ThreadedTaskDispatcher()

LOCALHOST = '127.0.0.1'
SERVER_PORT = 0      # Set these port numbers to 0 to auto-bind, or
CONNECT_TO_PORT = 0  # use specific numbers to inspect using TCPWatch.


my_adj = Adjustments()
# Reduce overflows to make testing easier.
my_adj.outbuf_overflow = 10000
my_adj.inbuf_overflow = 10000


class Tests(unittest.TestCase):

    def setUp(self):
        td.setThreadCount(1)
        self.orig_map_size = len(socket_map)

        self.root_dir = tempfile.mktemp()
        os.mkdir(self.root_dir)
        os.mkdir(os.path.join(self.root_dir, 'test'))

        self.server = FTPServer(LOCALHOST, SERVER_PORT, self.root_dir,
                                DictionaryAuthentication({'foo': 'bar'}),
                                task_dispatcher=td, adj=my_adj)
        if CONNECT_TO_PORT == 0:
            self.port = self.server.socket.getsockname()[1]
        else:
            self.port = CONNECT_TO_PORT
        self.run_loop = 1
        self.counter = 0
        self.thread = Thread(target=self.loop)
        self.thread.start()
        sleep(0.1)  # Give the thread some time to start.


    def tearDown(self):
        self.run_loop = 0
        self.thread.join()
        td.shutdown()
        self.server.close()
        # Make sure all sockets get closed by asyncore normally.
        timeout = time() + 5
        #while 1:
        #    if len(socket_map) == self.orig_map_size:
        #        # Clean!
        #        break
        #    if time() >= timeout:
        #        print 'Leaked a socket: %s' % `socket_map`
        #        break
        #        #self.fail('Leaked a socket: %s' % `socket_map`)
        #    poll(0.1, socket_map)


    def loop(self):
        while self.run_loop:
            self.counter = self.counter + 1
            # print 'loop', self.counter
            poll(0.1, socket_map)


    def getFTPConnection(self, login=1):
        ftp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ftp.connect((LOCALHOST, self.port))
        result = ftp.recv(10000)
        self.assertEqual(result, status_msgs['SERVER_READY'] %(
            'localhost.localdomain') + '\r\n')
        if login:
            ftp.send('USER foo\r\n')
            self.assertEqual(ftp.recv(1024),
                             status_msgs['PASS_REQUIRED'] +'\r\n')
            ftp.send('PASS bar\r\n')
            self.assertEqual(ftp.recv(1024),
                             status_msgs['LOGIN_SUCCESS'] +'\r\n')
            
        return ftp


    def execute(self, commands, login=1):
        ftp = self.getFTPConnection(login)

        if type(commands) is StringType:
            commands = (commands,)

        for command in commands:
            ftp.send('%s\r\n' %command)
            result = ftp.recv(10000)
            #print result[:-2]

        self.failUnless(result.endswith('\r\n'))
        ftp.close()
        return result[:-2]


    def testABOR(self):
        self.assertEqual(self.execute('ABOR', 1),
                         status_msgs['TRANSFER_ABORTED'])

    #def testABOR(self):


    def testCDUP(self):
        self.assertEqual(self.execute(('CWD test', 'CDUP'), 1),
                         status_msgs['SUCCESS_250'] %'CDUP')
        self.assertEqual(self.execute('CDUP', 1),
                         status_msgs['SUCCESS_250'] %'CDUP')


    def testCWD(self):
        self.assertEqual(self.execute('CWD test', 1),
                         status_msgs['SUCCESS_250'] %'CWD')
        self.assertEqual(self.execute('CWD foo', 1),
                         status_msgs['ERR_NO_DIR'] %'/foo')


    def testDELE(self):
        open(os.path.join(self.root_dir, 'foo'), 'w').write('blah')
        self.assertEqual(self.execute('DELE foo', 1),
                         status_msgs['SUCCESS_250'] %'DELE')
        self.assertEqual(self.execute('DELE bar', 1),
                         status_msgs['ERR_DELETE_FILE'])
        self.assertEqual(self.execute('DELE', 1),
                         status_msgs['CMD_UNKNOWN'] %'DELE')


    def testHELP(self):
        result = status_msgs['HELP_START'] #+ '\r\n'
        #result += 'Help goes here somewhen.\r\n'
        #result += status_msgs['HELP_END']        

        self.assertEqual(self.execute('HELP', 1), result)


    def testLIST(self):
        path = os.path.join(self.root_dir, 'foo')
        result = "[Errno 2] No such file or directory: '%s'" %path

        self.assertEqual(self.execute('LIST /foo', 1),
                         status_msgs['ERR_NO_LIST'] %result)
        self.assertEqual(self.execute('LIST', 1),
                         status_msgs['SUCCESS_200'] %'NOOP')


    def testNOOP(self):
        self.assertEqual(self.execute('NOOP', 0),
                         status_msgs['SUCCESS_200'] %'NOOP')
        self.assertEqual(self.execute('NOOP', 1),
                         status_msgs['SUCCESS_200'] %'NOOP')


    def testPASS(self):
        self.assertEqual(self.execute('PASS', 0),
                         status_msgs['LOGIN_MISMATCH'])
        self.assertEqual(self.execute(('USER blah', 'PASS bar'), 0),
                         status_msgs['LOGIN_MISMATCH'])


    def testQUIT(self):
        self.assertEqual(self.execute('QUIT', 0),
                         status_msgs['GOODBYE'])
        self.assertEqual(self.execute('QUIT', 1),
                         status_msgs['GOODBYE'])


    def testUSER(self):
        self.assertEqual(self.execute('USER foo', 0),
                         status_msgs['PASS_REQUIRED'])
        self.assertEqual(self.execute('USER', 0),
                         status_msgs['CMD_UNKNOWN'] %'USER')



def test_suite():
    loader = unittest.TestLoader()
    return loader.loadTestsFromTestCase(Tests)

if __name__=='__main__':
    unittest.TextTestRunner().run( test_suite() )


=== Added File Zope3/lib/python/Zope/Server/FTP/tests/testPublisherServer.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors.  All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 1.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
##############################################################################
"""

$Id: testPublisherServer.py,v 1.1.2.1 2002/04/06 19:57:09 srichter Exp $
"""

#import unittest
#from asyncore import socket_map, poll
#import sys
#
#from threading import Thread
#from Zope.Server.TaskThreads import ThreadedTaskDispatcher
#from Zope.Server.HTTP.PublisherHTTPServer import PublisherHTTPServer
#from Zope.Publisher.Browser.BrowserRequest import BrowserRequest
#
#from Zope.Publisher.DefaultPublication import DefaultPublication
#from Zope.Publisher.Exceptions import Redirect, Retry
#from Zope.Publisher.HTTP import HTTPRequest
#
#from httplib import HTTPConnection
#
#from time import sleep, time
#
#td = ThreadedTaskDispatcher()
#
#LOCALHOST = '127.0.0.1'
#
#HTTPRequest.STAGGER_RETRIES = 0  # Don't pause.
#
#
#class Conflict (Exception):
#    """
#    Pseudo ZODB conflict error.
#    """
#
#
#class PublicationWithConflict(DefaultPublication):
#
#    def handleException(self, request, exc_info, retry_allowed=1):
#        if exc_info[0] is Conflict and retry_allowed:
#            # This simulates a ZODB retry.
#            raise Retry(exc_info)
#        else:
#            DefaultPublication.handleException(self, request, exc_info,
#                                               retry_allowed)
#
#
#class tested_object:
#    " "
#    tries = 0
#
#    def __call__(self, REQUEST):
#        return 'URL invoked: %s' % REQUEST.URL
#
#    def redirect_method(self, REQUEST):
#        "Generates a redirect using the redirect() method."
#        REQUEST.getResponse().redirect("http://somewhere.com/redirect")
#
#    def redirect_exception(self):
#        "Generates a redirect using an exception."
#        raise Redirect("http://somewhere.com/exception")
#
#    def conflict(self, REQUEST, wait_tries):
#        """
#        Returns 202 status only after (wait_tries) tries.
#        """
#        if self.tries >= int(wait_tries):
#            raise "Accepted"
#        else:
#            self.tries += 1
#            raise Conflict
#
#
#
#class Tests(unittest.TestCase):
#
#    def setUp(self):
#
#        obj = tested_object()
#        obj.folder = tested_object()
#        obj.folder.item = tested_object()
#
#        obj._protected = tested_object()
#
#        pub = PublicationWithConflict(obj)
#
#        def request_factory(input_stream, output_steam, env):
#            request = BrowserRequest(input_stream, output_steam, env)
#            request.setPublication(pub)
#            return request
#
#        td.setThreadCount(4)
#        # Bind to any port on localhost.
#        self.server = PublisherHTTPServer(request_factory, 'Browser',
#                                          LOCALHOST, 0, task_dispatcher=td)
#        self.port = self.server.socket.getsockname()[1]
#        self.run_loop = 1
#        self.thread = Thread(target=self.loop)
#        self.thread.start()
#        sleep(0.1)  # Give the thread some time to start.
#
#    def tearDown(self):
#        self.run_loop = 0
#        self.thread.join()
#        td.shutdown()
#        self.server.close()
#
#    def loop(self):
#        while self.run_loop:
#            poll(0.1, socket_map)
#
#    def testResponse(self, path='/', status_expected=200,
#                     add_headers=None, request_body=''):
#        h = HTTPConnection(LOCALHOST, self.port)
#        h.putrequest('GET', path)
#        h.putheader('Accept', 'text/plain')
#        if add_headers:
#            for k, v in add_headers.items():
#                h.putheader(k, v)
#        if request_body:
#            h.putheader('Content-Length', str(int(len(request_body))))
#        h.endheaders()
#        if request_body:
#            h.send(request_body)
#        response = h.getresponse()
#        length = int(response.getheader('Content-Length', '0'))
#        if length:
#            response_body = response.read(length)
#        else:
#            response_body = ''
#        # XXX How to test this now that we don't set the response code?
#        ##self.failUnlessEqual(int(response.status), status_expected)
#        self.failUnlessEqual(length, len(response_body))
#
#        if (status_expected == 200):
#            if path == '/': path = ''
#            expect_response = 'URL invoked: http://%s:%d%s' % (LOCALHOST,
#                self.port, path)
#            self.failUnlessEqual(response_body, expect_response)
#
#    def testDeeperPath(self):
#        self.testResponse(path='/folder/item')
#
#    def testNotFound(self):
#        self.testResponse(path='/foo/bar', status_expected=404)
#
#    def testUnauthorized(self):
#        self.testResponse(path='/_protected', status_expected=401)
#
#    def testRedirectMethod(self):
#        self.testResponse(path='/redirect_method', status_expected=302)
#
#    def testRedirectException(self):
#        self.testResponse(path='/redirect_exception', status_expected=302)
#        self.testResponse(path='/folder/redirect_exception',
#                          status_expected=302)
#
#    def testConflictRetry(self):
#        # Expect the "Accepted" response since the retries will succeed.
#        self.testResponse(path='/conflict?wait_tries=2', status_expected=202)
#
#    def testFailedConflictRetry(self):
#        # Expect a "Conflict" response since there will be too many
#        # conflicts.
#        self.testResponse(path='/conflict?wait_tries=10', status_expected=409)
#
#
#
#def test_suite():
#    loader = unittest.TestLoader()
#    return loader.loadTestsFromTestCase(Tests)
#
#if __name__=='__main__':
#    unittest.TextTestRunner().run( test_suite() )