[Zope3-checkins] SVN: zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/ - add test for running subprocesses in parallel

Benji York benji at zope.com
Sat Jul 5 20:11:04 EDT 2008


Log message for revision 88049:
  - add test for running subprocesses in parallel
  - make output ordered the same whether running parallel subprocesses or not
  

Changed:
  U   zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py
  U   zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/testrunner-layers.txt

-=-
Modified: zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py
===================================================================
--- zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py	2008-07-06 00:06:23 UTC (rev 88048)
+++ zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/runner.py	2008-07-06 00:11:03 UTC (rev 88049)
@@ -229,8 +229,9 @@
 
         if should_resume:
             setup_layers = None
-            self.ran += resume_tests(self.options, self.features,
-                layers_to_run, self.failures, self.errors)
+            if layers_to_run:
+                self.ran += resume_tests(self.options, self.features,
+                    layers_to_run, self.failures, self.errors)
 
         if setup_layers:
             if self.options.resume_layer == None:
@@ -368,104 +369,127 @@
     def runTest(self):
         "Layer set up failure."
 
-def spawn_layer_in_subprocess(rans, options, features, layer_name, layer,
+def spawn_layer_in_subprocess(result, options, features, layer_name, layer,
         failures, errors, resume_number):
-    args = [sys.executable,
-            sys.argv[0],
-            '--resume-layer', layer_name, str(resume_number),
-            ]
-    for d in options.testrunner_defaults:
-        args.extend(['--default', d])
+    try:
+        args = [sys.executable,
+                sys.argv[0],
+                '--resume-layer', layer_name, str(resume_number),
+                ]
+        for d in options.testrunner_defaults:
+            args.extend(['--default', d])
 
-    args.extend(options.original_testrunner_args[1:])
+        args.extend(options.original_testrunner_args[1:])
 
-    # this is because of a bug in Python (http://www.python.org/sf/900092)
-    if (options.profile == 'hotshot'
-        and sys.version_info[:3] <= (2,4,1)):
-        args.insert(1, '-O')
+        # this is because of a bug in Python (http://www.python.org/sf/900092)
+        if (options.profile == 'hotshot'
+            and sys.version_info[:3] <= (2,4,1)):
+            args.insert(1, '-O')
 
-    if sys.platform.startswith('win'):
-        args = args[0] + ' ' + ' '.join([
-            ('"' + a.replace('\\', '\\\\').replace('"', '\\"') + '"')
-            for a in args[1:]
-            ])
+        if sys.platform.startswith('win'):
+            args = args[0] + ' ' + ' '.join([
+                ('"' + a.replace('\\', '\\\\').replace('"', '\\"') + '"')
+                for a in args[1:]
+                ])
 
-    for feature in features:
-        feature.layer_setup(layer)
+        for feature in features:
+            feature.layer_setup(layer)
 
-    subin, subout, suberr = os.popen3(args)
-    while True:
-        try:
-            for l in subout:
-                sys.stdout.write(l)
-        except IOError, e:
-            if e.errno == errno.EINTR:
-                # If the reading the subprocess input is interruped (as
-                # recieving SIGCHLD can), then retry.
+        subin, subout, suberr = os.popen3(args)
+        while True:
+            try:
+                for line in subout:
+                    result.stdout.append(line)
+            except IOError, e:
+                if e.errno == errno.EINTR:
+                    # If the reading the subprocess input is interruped (as
+                    # be caused by recieving SIGCHLD), then retry.
+                    continue
+                options.output.error(
+                    "Error reading subprocess output for %s" % layer_name)
+                options.output.info(str(e))
+            else:
+                break
+
+        # The subprocess may have spewed any number of things to stderr, so
+        # we'll keep looking until we find the information we're looking for.
+        whole_suberr = ''
+        while True:
+            line = suberr.readline()
+            whole_suberr += line
+            if not line:
+                raise SubprocessError(
+                    'No subprocess summary found', whole_suberr)
+
+            try:
+                result.num_ran, nfail, nerr = map(int, line.strip().split())
+                break
+            except KeyboardInterrupt:
+                raise
+            except:
                 continue
-            options.output.error(
-                "Error reading subprocess output for %s" % layer_name)
-            options.output.info(str(e))
-        else:
-            break
 
-    # Subprocesses may have spewed any number of things to stderr, so we'll
-    # keep looking until we find the information we're looking for.
-    whole_suberr = ''
-    while True:
-        line = suberr.readline()
-        whole_suberr += line
-        if not line:
-            raise SubprocessError(
-                'No subprocess summary found', whole_suberr)
+        while nfail > 0:
+            nfail -= 1
+            failures.append((suberr.readline().strip(), None))
+        while nerr > 0:
+            nerr -= 1
+            errors.append((suberr.readline().strip(), None))
 
-        try:
-            ran, nfail, nerr = map(int, line.strip().split())
-            break
-        except KeyboardInterrupt:
-            raise
-        except:
-            sys.stderr.write(line)
-            continue
+    finally:
+        result.done = True
 
-    while nfail > 0:
-        nfail -= 1
-        failures.append((suberr.readline().strip(), None))
-    while nerr > 0:
-        nerr -= 1
-        errors.append((suberr.readline().strip(), None))
 
-    rans.append(ran)
+class SubprocessResult(object):
+    def __init__(self):
+        self.num_ran = 0
+        self.stdout = []
+        self.done = False
 
 
 def resume_tests(options, features, layers, failures, errors):
-    rans = []
-    resume_number = 0
+    results = []
+    resume_number = int(options.processes > 1)
     ready_threads = []
     for layer_name, layer, tests in layers:
+        result = SubprocessResult()
+        results.append(result)
         ready_threads.append(threading.Thread(
             target=spawn_layer_in_subprocess,
-            args=(rans, options, features, layer_name, layer, failures,
+            args=(result, options, features, layer_name, layer, failures,
                 errors, resume_number)))
         resume_number += 1
 
     # Now start a few threads at a time.
     running_threads = []
+    results_iter = iter(results)
+    current_result = results_iter.next()
     while ready_threads or running_threads:
         while len(running_threads) < options.processes and ready_threads:
             thread = ready_threads.pop(0)
             thread.start()
             running_threads.append(thread)
 
-        for index, thread in list(enumerate(running_threads)):
+        for index, thread in reversed(list(enumerate(running_threads))):
             if not thread.isAlive():
                 del running_threads[index]
-        time.sleep(0.01) # keep the loop from being too tight
 
-    # Gather up all the results.
-    return sum(rans)
+        # We want to display results in the order they would have been
+        # displayed, had the work not been done in parallel.
+        while current_result and current_result.done:
+            map(sys.stdout.write, current_result.stdout)
 
+            try:
+                current_result = results_iter.next()
+            except StopIteration:
+                current_result = None
 
+        time.sleep(0.01) # Keep the loop from being too tight.
+
+    # Return the total number of tests run.
+    return sum(r.num_ran for r in results)
+
+
 def tear_down_unneeded(options, needed, setup_layers, optional=False):
     # Tear down any layers not needed for these tests. The unneeded
     # layers might interfere.

Modified: zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/testrunner-layers.txt
===================================================================
--- zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/testrunner-layers.txt	2008-07-06 00:06:23 UTC (rev 88048)
+++ zope.testing/branches/benji-parallelize-subprocesses/src/zope/testing/testrunner/testrunner-layers.txt	2008-07-06 00:11:03 UTC (rev 88049)
@@ -124,3 +124,52 @@
     Total: 405 tests, 0 failures, 0 errors in N.NNN seconds.
     False
 
+It is possible to force the layers to run in subprocesses and parallelize them.
+
+    >>> sys.argv = [testrunner_script, '-j2']
+    >>> testrunner.run(defaults)
+    Running samplelayers.Layer1 tests:
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Ran 9 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running samplelayers.Layer11 tests:
+      Running in a subprocess.
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Set up samplelayers.Layer11 in N.NNN seconds.
+      Ran 34 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running samplelayers.Layer111 tests:
+      Running in a subprocess.
+      Set up samplelayers.Layerx in N.NNN seconds.
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Set up samplelayers.Layer11 in N.NNN seconds.
+      Set up samplelayers.Layer111 in N.NNN seconds.
+      Ran 34 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running samplelayers.Layer112 tests:
+      Running in a subprocess.
+      Set up samplelayers.Layerx in N.NNN seconds.
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Set up samplelayers.Layer11 in N.NNN seconds.
+      Set up samplelayers.Layer112 in N.NNN seconds.
+      Ran 34 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running samplelayers.Layer12 tests:
+      Running in a subprocess.
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Set up samplelayers.Layer12 in N.NNN seconds.
+      Ran 34 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running samplelayers.Layer121 tests:
+      Running in a subprocess.
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Set up samplelayers.Layer12 in N.NNN seconds.
+      Set up samplelayers.Layer121 in N.NNN seconds.
+      Ran 34 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running samplelayers.Layer122 tests:
+      Running in a subprocess.
+      Set up samplelayers.Layer1 in N.NNN seconds.
+      Set up samplelayers.Layer12 in N.NNN seconds.
+      Set up samplelayers.Layer122 in N.NNN seconds.
+      Ran 34 tests with 0 failures and 0 errors in N.NNN seconds.
+    Running zope.testing.testrunner.layer.UnitTests tests:
+      Running in a subprocess.
+      Set up zope.testing.testrunner.layer.UnitTests in N.NNN seconds.
+      Ran 192 tests with 0 failures and 0 errors in N.NNN seconds.
+    Total: 405 tests, 0 failures, 0 errors in N.NNN seconds.
+    False



More information about the Zope3-Checkins mailing list