[Zodb-checkins] SVN: ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py Add tests for 'dofile'.

Tres Seaver tseaver at palladion.com
Fri May 14 13:45:41 EDT 2010


Log message for revision 112307:
  Add tests for 'dofile'.

Changed:
  U   ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py

-=-
Modified: ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py
===================================================================
--- ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py	2010-05-14 17:44:37 UTC (rev 112306)
+++ ZODB/branches/tseaver-better_repozo_tests/src/ZODB/scripts/tests/test_repozo.py	2010-05-14 17:45:41 UTC (rev 112307)
@@ -77,109 +77,50 @@
         self.close()
 
 
-class RepozoTests(unittest.TestCase):
+class Test_dofile(unittest.TestCase):
 
-    layer = ZODB.tests.util.MininalTestLayer('repozo')
+    def _callFUT(self, func, fp, n):
+        from ZODB.scripts.repozo import dofile
+        return dofile(func, fp, n)
 
-    def setUp(self):
-        # compute directory names
-        import tempfile
-        self.basedir = tempfile.mkdtemp()
-        self.backupdir = os.path.join(self.basedir, 'backup')
-        self.datadir = os.path.join(self.basedir, 'data')
-        self.restoredir = os.path.join(self.basedir, 'restore')
-        self.copydir = os.path.join(self.basedir, 'copy')
-        self.currdir = os.getcwd()
-        # create empty directories
-        os.mkdir(self.backupdir)
-        os.mkdir(self.datadir)
-        os.mkdir(self.restoredir)
-        os.mkdir(self.copydir)
-        os.chdir(self.datadir)
-        self.db = OurDB(self.datadir)
+    def _makeChunks(self):
+        from ZODB.scripts.repozo import READCHUNK
+        return ['x' * READCHUNK, 'y' * READCHUNK, 'z']
 
-    def tearDown(self):
-        os.chdir(self.currdir)
-        import shutil
-        shutil.rmtree(self.basedir)
+    def _makeFile(self, text=None):
+        from StringIO import StringIO
+        if text is None:
+            text = ''.join(self._makeChunks())
+        return StringIO(text)
 
-    def _callRepozoMain(self, argv):
-        from ZODB.scripts.repozo import main
-        main(argv)
+    def test_empty_read_all(self):
+        chunks = []
+        file = self._makeFile('')
+        bytes = self._callFUT(chunks.append, file, None)
+        self.assertEqual(bytes, 0)
+        self.assertEqual(chunks, [])
 
-    def test_via_monte_carlo(self):
-        self.saved_snapshots = []  # list of (name, time) pairs for copies.
+    def test_empty_read_count(self):
+        chunks = []
+        file = self._makeFile('')
+        bytes = self._callFUT(chunks.append, file, 42)
+        self.assertEqual(bytes, 0)
+        self.assertEqual(chunks, [])
 
-        for i in range(100):
-            self.mutate_pack_backup(i)
+    def test_nonempty_read_all(self):
+        chunks = []
+        file = self._makeFile()
+        bytes = self._callFUT(chunks.append, file, None)
+        self.assertEqual(bytes, file.tell())
+        self.assertEqual(chunks, self._makeChunks())
 
-        # Verify snapshots can be reproduced exactly.
-        for copyname, copytime in self.saved_snapshots:
-            if _NOISY:
-                print "Checking that", copyname,
-                print "at", copytime, "is reproducible."
-            self.assertRestored(copyname, copytime)
+    def test_nonempty_read_count(self):
+        chunks = []
+        file = self._makeFile()
+        bytes = self._callFUT(chunks.append, file, 42)
+        self.assertEqual(bytes, 42)
+        self.assertEqual(chunks, ['x' * 42])
 
-    def mutate_pack_backup(self, i):
-        import random
-        from shutil import copyfile
-        from time import gmtime
-        from time import sleep
-        self.db.mutate()
-
-        # Pack about each tenth time.
-        if random.random() < 0.1:
-            if _NOISY:
-                print "packing"
-            self.db.pack()
-            self.db.close()
-
-        # Make an incremental backup, half the time with gzip (-z).
-        argv = ['-BQr', self.backupdir, '-f', 'Data.fs']
-        if _NOISY:
-            argv.insert(0, '-v')
-        if random.random() < 0.5:
-            argv.insert(0, '-z')
-        self._callRepozoMain(argv)
-
-        # Save snapshots to assert that dated restores are possible
-        if i % 9 == 0:
-            srcname = os.path.join(self.datadir, 'Data.fs')
-            copytime = '%04d-%02d-%02d-%02d-%02d-%02d' % (gmtime()[:6])
-            copyname = os.path.join(self.copydir, "Data%d.fs" % i)
-            copyfile(srcname, copyname)
-            self.saved_snapshots.append((copyname, copytime))
-
-        # Make sure the clock moves at least a second.
-        sleep(1.01)
-
-        # Verify current Data.fs can be reproduced exactly.
-        self.assertRestored()
-
-    def assertRestored(self, correctpath='Data.fs', when=None):
-    # Do recovery to time 'when', and check that it's identical to correctpath.
-        # restore to Restored.fs
-        restoredfile = os.path.join(self.restoredir, 'Restored.fs')
-        argv = ['-Rr', self.backupdir, '-o', restoredfile]
-        if _NOISY:
-            argv.insert(0, '-v')
-        if when is not None:
-            argv.append('-D')
-            argv.append(when)
-        self._callRepozoMain(argv)
-
-        # check restored file content is equal to file that was backed up
-        f = file(correctpath, 'rb')
-        g = file(restoredfile, 'rb')
-        fguts = f.read()
-        gguts = g.read()
-        f.close()
-        g.close()
-        msg = ("guts don't match\ncorrectpath=%r when=%r\n cmd=%r" %
-            (correctpath, when, ' '.join(argv)))
-        self.assertEquals(fguts, gguts, msg)
-
-
 class TestBase:
 
     _repository_directory = None
@@ -422,10 +363,114 @@
                              md5(increment).hexdigest()))
 
 
+class MonteCarloTests(unittest.TestCase):
+
+    layer = ZODB.tests.util.MininalTestLayer('repozo')
+
+    def setUp(self):
+        # compute directory names
+        import tempfile
+        self.basedir = tempfile.mkdtemp()
+        self.backupdir = os.path.join(self.basedir, 'backup')
+        self.datadir = os.path.join(self.basedir, 'data')
+        self.restoredir = os.path.join(self.basedir, 'restore')
+        self.copydir = os.path.join(self.basedir, 'copy')
+        self.currdir = os.getcwd()
+        # create empty directories
+        os.mkdir(self.backupdir)
+        os.mkdir(self.datadir)
+        os.mkdir(self.restoredir)
+        os.mkdir(self.copydir)
+        os.chdir(self.datadir)
+        self.db = OurDB(self.datadir)
+
+    def tearDown(self):
+        os.chdir(self.currdir)
+        import shutil
+        shutil.rmtree(self.basedir)
+
+    def _callRepozoMain(self, argv):
+        from ZODB.scripts.repozo import main
+        main(argv)
+
+    def test_via_monte_carlo(self):
+        self.saved_snapshots = []  # list of (name, time) pairs for copies.
+
+        for i in range(100):
+            self.mutate_pack_backup(i)
+
+        # Verify snapshots can be reproduced exactly.
+        for copyname, copytime in self.saved_snapshots:
+            if _NOISY:
+                print "Checking that", copyname,
+                print "at", copytime, "is reproducible."
+            self.assertRestored(copyname, copytime)
+
+    def mutate_pack_backup(self, i):
+        import random
+        from shutil import copyfile
+        from time import gmtime
+        from time import sleep
+        self.db.mutate()
+
+        # Pack about each tenth time.
+        if random.random() < 0.1:
+            if _NOISY:
+                print "packing"
+            self.db.pack()
+            self.db.close()
+
+        # Make an incremental backup, half the time with gzip (-z).
+        argv = ['-BQr', self.backupdir, '-f', 'Data.fs']
+        if _NOISY:
+            argv.insert(0, '-v')
+        if random.random() < 0.5:
+            argv.insert(0, '-z')
+        self._callRepozoMain(argv)
+
+        # Save snapshots to assert that dated restores are possible
+        if i % 9 == 0:
+            srcname = os.path.join(self.datadir, 'Data.fs')
+            copytime = '%04d-%02d-%02d-%02d-%02d-%02d' % (gmtime()[:6])
+            copyname = os.path.join(self.copydir, "Data%d.fs" % i)
+            copyfile(srcname, copyname)
+            self.saved_snapshots.append((copyname, copytime))
+
+        # Make sure the clock moves at least a second.
+        sleep(1.01)
+
+        # Verify current Data.fs can be reproduced exactly.
+        self.assertRestored()
+
+    def assertRestored(self, correctpath='Data.fs', when=None):
+    # Do recovery to time 'when', and check that it's identical to correctpath.
+        # restore to Restored.fs
+        restoredfile = os.path.join(self.restoredir, 'Restored.fs')
+        argv = ['-Rr', self.backupdir, '-o', restoredfile]
+        if _NOISY:
+            argv.insert(0, '-v')
+        if when is not None:
+            argv.append('-D')
+            argv.append(when)
+        self._callRepozoMain(argv)
+
+        # check restored file content is equal to file that was backed up
+        f = file(correctpath, 'rb')
+        g = file(restoredfile, 'rb')
+        fguts = f.read()
+        gguts = g.read()
+        f.close()
+        g.close()
+        msg = ("guts don't match\ncorrectpath=%r when=%r\n cmd=%r" %
+            (correctpath, when, ' '.join(argv)))
+        self.assertEquals(fguts, gguts, msg)
+
+
 def test_suite():
     return unittest.TestSuite([
-        unittest.makeSuite(RepozoTests),
+        unittest.makeSuite(Test_dofile),
         unittest.makeSuite(Test_delete_old_backups),
         unittest.makeSuite(Test_do_full_backup),
         unittest.makeSuite(Test_do_incremental_backup),
+        unittest.makeSuite(MonteCarloTests),
     ])



More information about the Zodb-checkins mailing list