[CMF-checkins] SVN: CMF/branches/mj-cmf14-compat-branch/CMFSetup/ Backport TarballImportContext from GS 1.1.

Tres Seaver tseaver at palladion.com
Mon Mar 10 17:02:45 EDT 2008


Log message for revision 84577:
  Backport TarballImportContext from GS 1.1.

Changed:
  U   CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py
  U   CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py

-=-
Modified: CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py
===================================================================
--- CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py	2008-03-10 20:41:54 UTC (rev 84576)
+++ CMF/branches/mj-cmf14-compat-branch/CMFSetup/context.py	2008-03-10 21:02:40 UTC (rev 84577)
@@ -186,6 +186,112 @@
 InitializeClass( DirectoryExportContext )
 
 
+class TarballImportContext( Implicit ):
+
+    __implements__ = ( IImportContext, )
+
+    security = ClassSecurityInfo()
+
+    def __init__( self, tool, archive_bits, encoding=None, should_purge=False ):
+
+        self._site = aq_parent( aq_inner( tool ) )
+        timestamp = time.gmtime()
+        self._archive_stream = StringIO(archive_bits)
+        self._archive = TarFile.open( 'foo.bar', 'r:gz'
+                                    , self._archive_stream )
+        self._encoding = encoding
+        self._should_purge = bool( should_purge )
+
+    security.declareProtected( ManagePortal, 'getSite' )
+    def getSite( self ):
+
+        """ See ISetupContext.
+        """
+        return aq_self(self._site)
+
+    security.declareProtected( ManagePortal, 'getEncoding' )
+    def getEncoding( self ):
+
+        """ See IImportContext.
+        """
+        return self._encoding
+
+    def readDataFile( self, filename, subdir=None ):
+
+        """ See IImportContext.
+        """
+        if subdir is not None:
+            filename = '/'.join( ( subdir, filename ) )
+
+        try:
+            file = self._archive.extractfile( filename )
+        except KeyError:
+            return None
+
+        return file.read()
+
+    def getLastModified( self, path ):
+
+        """ See IImportContext.
+        """
+        info = self._getTarInfo( path )
+        return info and info.mtime or None
+
+    def isDirectory( self, path ):
+
+        """ See IImportContext.
+        """
+        info = self._getTarInfo( path )
+
+        if info is not None:
+            return info.isdir()
+
+    def listDirectory(self, path, skip=()):
+
+        """ See IImportContext.
+        """
+        if path is None:  # root is special case:  no leading '/'
+            path = ''
+        else:
+            if not self.isDirectory(path):
+                return None
+
+            if path[-1] != '/':
+                path = path + '/'
+
+        pfx_len = len(path)
+
+        names = []
+        for name in self._archive.getnames():
+            if name == path or not name.startswith(path):
+                continue
+            name = name[pfx_len:]
+            if '/' in name or name in skip:
+                continue
+            names.append(name)
+
+        return names
+
+    def shouldPurge( self ):
+
+        """ See IImportContext.
+        """
+        return self._should_purge
+
+    def _getTarInfo( self, path ):
+        if path[-1] == '/':
+            path = path[:-1]
+        try:
+            return self._archive.getmember( path )
+        except KeyError:
+            pass
+        try:
+            return self._archive.getmember( path + '/' )
+        except KeyError:
+            return None
+
+
+
 class TarballExportContext( Implicit ):
 
     __implements__ = ( IExportContext, )

Modified: CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py
===================================================================
--- CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py	2008-03-10 20:41:54 UTC (rev 84576)
+++ CMF/branches/mj-cmf14-compat-branch/CMFSetup/tests/test_context.py	2008-03-10 21:02:40 UTC (rev 84577)
@@ -23,6 +23,8 @@
 import os
 import time
 from StringIO import StringIO
+from tarfile import TarFile
+from tarfile import TarInfo
 
 from DateTime.DateTime import DateTime
 from OFS.Folder import Folder
@@ -380,6 +382,304 @@
         self.assertEqual( open( fqname, 'rb' ).read(), digits )
 
 
+class TarballImportContextTests( SecurityRequestTest
+                               , ConformsToISetupContext
+                               , ConformsToIImportContext
+                               ):
+
+    def _getTargetClass( self ):
+
+        from Products.CMFSetup.context import TarballImportContext
+        return TarballImportContext
+
+    def _makeOne( self, file_dict={}, mod_time=None, *args, **kw ):
+
+        archive_stream = StringIO()
+        archive = TarFile.open('test.tar.gz', 'w:gz', archive_stream)
+
+        def _addOneMember(path, data, modtime):
+            stream = StringIO(v)
+            info = TarInfo(k)
+            info.size = len(v)
+            info.mtime = mod_time
+            archive.addfile(info, stream)
+
+        def _addMember(path, data, modtime):
+            from tarfile import DIRTYPE
+            elements = path.split('/')
+            parents = filter(None, [elements[x] for x in range(len(elements))])
+            for parent in parents:
+                info = TarInfo()
+                info.name = parent
+                info.size = 0
+                info.mtime = mod_time
+                info.type = DIRTYPE
+                archive.addfile(info, StringIO())
+            _addOneMember(path, data, modtime)
+
+        file_items = file_dict.items() or [('dummy', '')] # empty archive barfs
+
+        if mod_time is None:
+            mod_time = time.time()
+
+        for k, v in file_items:
+            _addMember(k, v, mod_time)
+
+        archive.close()
+        bits = archive_stream.getvalue()
+
+        site = DummySite( 'site' ).__of__( self.root )
+        site._setObject( 'setup_tool', Folder( 'setup_tool' ) )
+        tool = site._getOb( 'setup_tool' )
+
+        ctx = self._getTargetClass()( tool, bits, *args, **kw )
+
+        return site, tool, ctx.__of__( tool )
+
+    def test_ctorparms( self ):
+
+        ENCODING = 'latin-1'
+        site, tool, ctx = self._makeOne( encoding=ENCODING
+                                       , should_purge=True
+                                       )
+
+        self.assertEqual( ctx.getEncoding(), ENCODING )
+        self.assertEqual( ctx.shouldPurge(), True )
+
+    def test_empty( self ):
+
+        site, tool, ctx = self._makeOne()
+
+        self.assertEqual( ctx.getSite(), site )
+        self.assertEqual( ctx.getEncoding(), None )
+        self.assertEqual( ctx.shouldPurge(), False )
+
+        # These methods are all specified to return 'None' for non-existing
+        # paths / entities
+        self.assertEqual( ctx.isDirectory( 'nonesuch/path' ), None )
+        self.assertEqual( ctx.listDirectory( 'nonesuch/path' ), None )
+
+    def test_readDataFile_nonesuch( self ):
+
+        FILENAME = 'nonesuch.txt'
+
+        site, tool, ctx = self._makeOne()
+
+        self.assertEqual( ctx.readDataFile( FILENAME ), None )
+        self.assertEqual( ctx.readDataFile( FILENAME, 'subdir' ), None )
+
+    def test_readDataFile_simple( self ):
+
+        from string import printable
+
+        FILENAME = 'simple.txt'
+
+        site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+        self.assertEqual( ctx.readDataFile( FILENAME ), printable )
+
+    def test_readDataFile_subdir( self ):
+
+        from string import printable
+
+        FILENAME = 'subdir.txt'
+        SUBDIR = 'subdir'
+
+        site, tool, ctx = self._makeOne( { '%s/%s' % (SUBDIR, FILENAME):
+                                            printable } )
+
+        self.assertEqual( ctx.readDataFile( FILENAME, SUBDIR ), printable )
+
+    def test_getLastModified_nonesuch( self ):
+
+        FILENAME = 'nonesuch.txt'
+
+        site, tool, ctx = self._makeOne()
+
+        self.assertEqual( ctx.getLastModified( FILENAME ), None )
+
+    def test_getLastModified_simple( self ):
+
+        from string import printable
+
+        FILENAME = 'simple.txt'
+        WHEN = DateTime( '2004-01-01T00:00:00Z' )
+
+        site, tool, ctx = self._makeOne( { FILENAME : printable }
+                                       , mod_time=WHEN )
+
+        self.assertEqual( ctx.getLastModified( FILENAME ), WHEN )
+
+    def test_getLastModified_subdir( self ):
+
+        from string import printable
+
+        FILENAME = 'subdir.txt'
+        SUBDIR = 'subdir'
+        PATH = '%s/%s' % ( SUBDIR, FILENAME )
+        WHEN = DateTime( '2004-01-01T00:00:00Z' )
+
+        site, tool, ctx = self._makeOne( { PATH: printable }
+                                       , mod_time=WHEN )
+
+        self.assertEqual( ctx.getLastModified( PATH ), WHEN )
+
+    def test_getLastModified_directory( self ):
+
+        from string import printable
+
+        FILENAME = 'subdir.txt'
+        SUBDIR = 'subdir'
+        PATH = '%s/%s' % ( SUBDIR, FILENAME )
+        WHEN = DateTime( '2004-01-01T00:00:00Z' )
+
+        site, tool, ctx = self._makeOne( { PATH: printable }
+                                       , mod_time=WHEN
+                                       )
+
+        self.assertEqual( ctx.getLastModified( SUBDIR ), WHEN )
+
+    def test_isDirectory_nonesuch( self ):
+
+        FILENAME = 'nonesuch.txt'
+
+        site, tool, ctx = self._makeOne()
+
+        self.assertEqual( ctx.isDirectory( FILENAME ), None )
+
+    def test_isDirectory_simple( self ):
+
+        from string import printable
+
+        FILENAME = 'simple.txt'
+
+        site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+        self.assertEqual( ctx.isDirectory( FILENAME ), False )
+
+    def test_isDirectory_nested( self ):
+
+        from string import printable
+
+        SUBDIR = 'subdir'
+        FILENAME = 'nested.txt'
+        PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+        site, tool, ctx = self._makeOne( { PATH: printable } )
+
+        self.assertEqual( ctx.isDirectory( PATH ), False )
+
+    def test_isDirectory_subdir( self ):
+
+        from string import printable
+
+        SUBDIR = 'subdir'
+        FILENAME = 'nested.txt'
+        PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+        site, tool, ctx = self._makeOne( { PATH: printable } )
+
+        self.assertEqual( ctx.isDirectory( SUBDIR ), True )
+
+    def test_listDirectory_nonesuch( self ):
+
+        SUBDIR = 'nonesuch/path'
+
+        site, tool, ctx = self._makeOne()
+
+        self.assertEqual( ctx.listDirectory( SUBDIR ), None )
+
+    def test_listDirectory_root( self ):
+
+        from string import printable
+
+        FILENAME = 'simple.txt'
+
+        site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+        self.assertEqual( len( ctx.listDirectory( None ) ), 1 )
+        self.failUnless( FILENAME in ctx.listDirectory( None ) )
+
+    def test_listDirectory_simple( self ):
+
+        from string import printable
+
+        FILENAME = 'simple.txt'
+
+        site, tool, ctx = self._makeOne( { FILENAME: printable } )
+
+        self.assertEqual( ctx.listDirectory( FILENAME ), None )
+
+    def test_listDirectory_nested( self ):
+
+        from string import printable
+
+        SUBDIR = 'subdir'
+        FILENAME = 'nested.txt'
+        PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+        site, tool, ctx = self._makeOne( { PATH: printable } )
+
+        self.assertEqual( ctx.listDirectory( PATH ), None )
+
+    def test_listDirectory_single( self ):
+
+        from string import printable
+
+        SUBDIR = 'subdir'
+        FILENAME = 'nested.txt'
+        PATH = '%s/%s' % ( SUBDIR, FILENAME )
+
+        site, tool, ctx = self._makeOne( { PATH: printable } )
+
+        names = ctx.listDirectory( SUBDIR )
+        self.assertEqual( len( names ), 1 )
+        self.failUnless( FILENAME in names )
+
+    def test_listDirectory_multiple( self ):
+
+        from string import printable, uppercase
+
+        SUBDIR = 'subdir'
+        FILENAME1 = 'nested.txt'
+        PATH1 = '%s/%s' % ( SUBDIR, FILENAME1 )
+        FILENAME2 = 'another.txt'
+        PATH2 = '%s/%s' % ( SUBDIR, FILENAME2 )
+
+        site, tool, ctx = self._makeOne( { PATH1: printable
+                                         , PATH2: uppercase
+                                         } )
+                                             
+        names = ctx.listDirectory( SUBDIR )
+        self.assertEqual( len( names ), 2 )
+        self.failUnless( FILENAME1 in names )
+        self.failUnless( FILENAME2 in names )
+
+    def test_listDirectory_skip( self ):
+
+        from string import printable, uppercase
+
+        SUBDIR = 'subdir'
+        FILENAME1 = 'nested.txt'
+        PATH1 = '%s/%s' % ( SUBDIR, FILENAME1 )
+        FILENAME2 = 'another.txt'
+        PATH2 = '%s/%s' % ( SUBDIR, FILENAME2 )
+        FILENAME3 = 'another.bak'
+        PATH3 = '%s/%s' % ( SUBDIR, FILENAME3 )
+
+        site, tool, ctx = self._makeOne( { PATH1: printable
+                                         , PATH2: uppercase
+                                         , PATH3: 'xyz'
+                                         } )
+
+        names = ctx.listDirectory(SUBDIR, skip=(FILENAME1,))
+        self.assertEqual( len( names ), 2 )
+        self.failIf( FILENAME1 in names )
+        self.failUnless( FILENAME2 in names )
+        self.failUnless( FILENAME3 in names )
+
+
+
 class TarballExportContextTests( FilesystemTestBase
                                , TarballTester
                                , ConformsToISetupContext
@@ -977,6 +1277,7 @@
         unittest.makeSuite( DirectoryImportContextTests ),
         unittest.makeSuite( DirectoryExportContextTests ),
         unittest.makeSuite( TarballExportContextTests ),
+        unittest.makeSuite( TarballImportContextTests ),
         unittest.makeSuite( SnapshotExportContextTests ),
         unittest.makeSuite( SnapshotImportContextTests ),
         ))



More information about the CMF-checkins mailing list