[Zodb-checkins] SVN: ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py test passes for 2.4, 2.5 and 2.6; first step

Godefroid Chapelle gotcha at bubblenet.be
Wed Dec 2 11:32:48 EST 2009


Log message for revision 106179:
  test passes for 2.4, 2.5 and 2.6; first step
  

Changed:
  U   ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py

-=-
Modified: ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py
===================================================================
--- ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py	2009-12-02 16:31:15 UTC (rev 106178)
+++ ZODB/branches/test_repozo/src/ZODB/scripts/tests/test_repozo.py	2009-12-02 16:32:47 UTC (rev 106179)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 ##############################################################################
 #
-# Copyright (c) 2004 Zope Corporation and Contributors.
+# Copyright (c) 2004-2009 Zope Corporation and Contributors.
 # All Rights Reserved.
 #
 # This software is subject to the provisions of the Zope Public License,
@@ -13,26 +13,9 @@
 #
 ##############################################################################
 
-"""Test repozo.py.
-
-This is a by-hand test.  It succeeds iff it doesn't blow up.  Run it with
-its home directory as the current directory.  It will destroy all files
-matching Data.* and Copy.* in this directory, and anything in a
-subdirectory of name 'backup'.
-
-Usage:
-
-python testrepozo.py [repozo_script]
-
-  repozo_script, if provided, is a path to a script that runs repozo,
-  such as that generated by buildout.
-
-eg:
-$ ../../../../bin/py testrepozo.py ../../../../bin/repozo
-"""
-
 import unittest
 import os
+import sys
 import random
 import time
 import glob
@@ -42,18 +25,29 @@
 from ZODB import FileStorage
 import transaction
 
-def cleanup():
-    for fname in glob.glob('Data.*') + glob.glob('Copy.*'):
+from ZODB.scripts import tests as tests_module
+
+REPOZO = os.path.join(os.path.dirname(sys.argv[0]), 'repozo')
+
+
+def cleanup(basedir):
+    globData = os.path.join(basedir, 'Data.*')
+    globCopy = os.path.join(basedir, 'Copy.*')
+    backupDir = os.path.join(basedir, 'backup')
+    for fname in glob.glob(globData) + glob.glob(globCopy):
         os.remove(fname)
 
-    if os.path.isdir('backup'):
-        for fname in os.listdir('backup'):
-            os.remove(os.path.join('backup', fname))
-        os.rmdir('backup')
+    if os.path.isdir(backupDir):
+        for fname in os.listdir(backupDir):
+            os.remove(os.path.join(backupDir, fname))
+        os.rmdir(backupDir)
 
+
 class OurDB:
-    def __init__(self):
+
+    def __init__(self, basedir):
         from BTrees.OOBTree import OOBTree
+        self.basedir = basedir
         self.getdb()
         conn = self.db.open()
         conn.root()['tree'] = OOBTree()
@@ -61,7 +55,8 @@
         self.close()
 
     def getdb(self):
-        storage = FileStorage.FileStorage('Data.fs')
+        storage_filename = os.path.join(self.basedir, 'Data.fs')
+        storage = FileStorage.FileStorage(storage_filename)
         self.db = ZODB.DB(storage)
 
     def gettree(self):
@@ -78,13 +73,27 @@
             self.db.close()
             self.db = None
 
+    def mutate(self):
+        # Make random mutations to the btree in the database.
+        tree = self.gettree()
+        for dummy in range(100):
+            if random.random() < 0.6:
+                tree[random.randrange(100000)] = random.randrange(100000)
+            else:
+                keys = tree.keys()
+                if keys:
+                    del tree[keys[0]]
+        transaction.commit()
+        self.close()
+
+
 # Do recovery to time 'when', and check that it's identical to correctpath.
 def check(correctpath='Data.fs', when=None):
     if when is None:
         extra = ''
     else:
         extra = ' -D ' + when
-    cmd = PYTHON + REPOZO + ' -vRr backup -o Copy.fs' + extra
+    cmd = REPOZO + ' -vRr backup -o Copy.fs' + extra
     os.system(cmd)
     f = file(correctpath, 'rb')
     g = file('Copy.fs', 'rb')
@@ -97,30 +106,16 @@
                          "    correctpath=%r when=%r\n"
                          "    cmd=%r" % (correctpath, when, cmd))
 
-def mutatedb(db):
-    # Make random mutations to the btree in the database.
-    tree = db.gettree()
-    for dummy in range(100):
-        if random.random() < 0.6:
-            tree[random.randrange(100000)] = random.randrange(100000)
-        else:
-            keys = tree.keys()
-            if keys:
-                del tree[keys[0]]
-    transaction.commit()
-    db.close()
 
-def main():
-    cleanup()
-    os.mkdir('backup')
-    d = OurDB()
+def main(basedir, d):
     # Every 9th time thru the loop, we save a full copy of Data.fs,
     # and at the end we ensure we can reproduce those too.
     saved_snapshots = []  # list of (name, time) pairs for copies.
 
     for i in range(100):
+        print i
         # Make some mutations.
-        mutatedb(d)
+        d.mutate()
 
         # Pack about each tenth time.
         if random.random() < 0.1:
@@ -130,14 +125,15 @@
 
         # Make an incremental backup, half the time with gzip (-z).
         if random.random() < 0.5:
-            os.system(PYTHON + REPOZO + ' -vBQr backup -f Data.fs')
+            os.system(REPOZO + ' -vBQr backup -f Data.fs')
         else:
-            os.system(PYTHON + REPOZO + ' -zvBQr backup -f Data.fs')
+            os.system(REPOZO + ' -zvBQr backup -f Data.fs')
 
         if i % 9 == 0:
             copytime = '%04d-%02d-%02d-%02d-%02d-%02d' % (time.gmtime()[:6])
-            copyname = os.path.join('backup', "Data%d" % i) + '.fs'
-            shutil.copyfile('Data.fs', copyname)
+            copyname = os.path.join(basedir, 'backup', "Data%d" % i) + '.fs'
+            srcname = os.path.join(basedir, 'Data.fs')
+            shutil.copyfile(srcname, copyname)
             saved_snapshots.append((copyname, copytime))
 
         # Make sure the clock moves at least a second.
@@ -151,22 +147,23 @@
         print "Checking that", copyname, "at", copytime, "is reproducible."
         check(copyname, copytime)
 
-    # Tear it all down.
-    cleanup()
-    print 'Test passed!'
 
-
-
 class RepozoTest(unittest.TestCase):
 
     def setUp(self):
-        pass
+        self.basedir = os.path.dirname(tests_module.__file__)
+        self.currdir = os.getcwd()
+        os.chdir(self.basedir)
+        cleanup(self.basedir)
+        os.mkdir(os.path.join(self.basedir, 'backup'))
+        self.d = OurDB(self.basedir)
 
     def tearDown(self):
-        pass
+        cleanup(self.basedir)
+        os.chdir(self.currdir)
 
     def testDummy(self):
-        self.failUnless(True)
+        main(self.basedir, self.d)
 
 
 def test_suite():



More information about the Zodb-checkins mailing list