[Zope-Checkins] CVS: Zope3/lib/python/Zope/Server/tests - testHTTPServer.py:1.1.2.8 testPublisherServer.py:1.1.2.4

Shane Hathaway shane@digicool.com
Mon, 7 Jan 2002 10:39:04 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/Server/tests
In directory cvs.zope.org:/tmp/cvs-serv31823/tests

Modified Files:
      Tag: Zope-3x-branch
	testHTTPServer.py testPublisherServer.py 
Log Message:
- Made next_channel_cleanup a list so that it can be more easily overridden
  by subclasses.

- Changed the name "channel_type" to "channel_base_class" for clarity.

- Renamed the "tasks" parameter to "task_dispatcher".

- Separated out the default server name computation into a method.

- Removed comments that are better described in interfaces.

- Provided a way to use an alternate socket map.


=== Zope3/lib/python/Zope/Server/tests/testHTTPServer.py 1.1.2.7 => 1.1.2.8 ===
 from Zope.Server.TaskThreads import ThreadedTaskDispatcher
 from Zope.Server.HTTPServer import http_task, http_channel, http_server
-from Zope.Server.dual_mode_channel import pull_trigger
 from Zope.Server.Adjustments import Adjustments
 from Zope.Server.ITask import ITask
 
@@ -23,7 +22,7 @@
 
 from time import sleep, time
 
-tasks = ThreadedTaskDispatcher()
+td = ThreadedTaskDispatcher()
 
 LOCALHOST = '127.0.0.1'
 
@@ -76,10 +75,11 @@
 class Tests(unittest.TestCase):
 
     def setUp(self):
-        tasks.setThreadCount(4)
+        td.setThreadCount(4)
         # Bind to any port on localhost.
         self.orig_map_size = len(socket_map)
-        self.server = EchoHTTPServer(LOCALHOST, 0, tasks=tasks, adj=my_adj)
+        self.server = EchoHTTPServer(LOCALHOST, 0, task_dispatcher=td,
+                                     adj=my_adj)
         self.port = self.server.socket.getsockname()[1]
         self.run_loop = 1
         self.counter = 0
@@ -90,7 +90,7 @@
     def tearDown(self):
         self.run_loop = 0
         self.thread.join()
-        tasks.shutdown()
+        td.shutdown()
         self.server.close()
         # Make sure all sockets get closed by asyncore normally.
         timeout = time() + 5
@@ -205,13 +205,13 @@
     def testThreading(self):
         # Ensures the correct number of threads keep running.
         for n in range(4):
-            tasks.addTask(SleepingTask())
+            td.addTask(SleepingTask())
         # Try to confuse the task manager.
-        tasks.setThreadCount(2)
-        tasks.setThreadCount(1)
+        td.setThreadCount(2)
+        td.setThreadCount(1)
         sleep(0.5)
         # There should be 1 still running.
-        self.failUnlessEqual(len(tasks.threads), 1)
+        self.failUnlessEqual(len(td.threads), 1)
 
     def testChunkingRequestWithoutContent(self):
         h = HTTPConnection(LOCALHOST, self.port)


=== Zope3/lib/python/Zope/Server/tests/testPublisherServer.py 1.1.2.3 => 1.1.2.4 ===
 from time import sleep, time
 
-tasks = ThreadedTaskDispatcher()
+td = ThreadedTaskDispatcher()
 
 LOCALHOST = '127.0.0.1'
 
@@ -89,10 +89,10 @@
         request_payload = BrowserRequestPayload(pub)
         response_payload = BrowserResponsePayload()
 
-        tasks.setThreadCount(4)
+        td.setThreadCount(4)
         # Bind to any port on localhost.
         self.server = PublisherHTTPServer(request_payload, response_payload, 
-            LOCALHOST, 0, tasks=tasks)
+                                          LOCALHOST, 0, task_dispatcher=td)
         self.port = self.server.socket.getsockname()[1]
         self.run_loop = 1
         self.thread = Thread(target=self.loop)
@@ -102,7 +102,7 @@
     def tearDown(self):
         self.run_loop = 0
         self.thread.join()
-        tasks.shutdown()
+        td.shutdown()
         self.server.close()
 
     def loop(self):