[Zope-Checkins] CVS: Zope/lib/python/Products/ZCatalog/regressiontests - keywords.py:1.2.2.1 loadmail.py:1.2.2.1 regressionCatalog.py:1.2.2.1 regressionCatalogTiming.py:1.2.2.1 unittest_patched.py:1.2.2.1

Shane Hathaway shane@digicool.com
Thu, 9 Aug 2001 13:33:41 -0400


Update of /cvs-repository/Zope/lib/python/Products/ZCatalog/regressiontests
In directory cvs.zope.org:/tmp/cvs-serv29115/lib/python/Products/ZCatalog/regressiontests

Added Files:
      Tag: NR-branch
	keywords.py loadmail.py regressionCatalog.py 
	regressionCatalogTiming.py unittest_patched.py 
Log Message:
Sync NR-branch with trunk.  Sorry about so many checkin messages...


=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/keywords.py ===
import rfc822,mailbox,cPickle,string

class Keywords:
    """ stupid class to read a list of rfc822 messages and extract
    all words from the subject header. We use this class for testing
    purposes only
    """

    def __init__(self):
        self.kw = []
        
    def build(self,mbox,limit):
    
        mb = mailbox.UnixMailbox(open(mbox))
        msg = mb.next()
        
        while msg and len(self.kw) < limit:

            sub =  msg.dict.get("subject","").split(' ')
            for f in sub:
                ok = 1
                for c in f: 
                    if not c in string.letters: ok=0
            
                if ok==1 and  not f in self.kw : self.kw.append(f)
                
            msg = mb.next()
            
        P = cPickle.Pickler(open('data/keywords','w'))
        P.dump(self.kw)
        
    def reload(self):
        P = cPickle.Unpickler(open('data/keywords','r'))
        self.kw = P.load()
        
        
    def keywords(self):
        return self.kw
        
        


if __name__=="__main__":

    k = Keywords()
    k.build("/home/andreas/zope.mbox",1000)

        


=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/loadmail.py === (578/678 lines abridged)
"""Test script for exercising various catalog features under load

Usage:
    cd lib/python
    python Products/ZCatalog/tests/loadmail.py command args

where each command has it's own command-line arguments that it expects.

Note that all of the commands operate on the Zope database,
typically var/Data.fs.

Note that this script uses the proc file system to get memory size.

Many of the commands output performance statisics on lines that look like::

      11.3585170507 0.06 2217781L 7212

where the numbers are:

        - clock time in seconds

        - cpu time used by the main thread, in seconds,

        - Database size growth over the test

        - Memory growth during the test (if the proc file system is available).

Commands:

    base mbox max

      Build a base database by:

        - Deleting ../../Data.fs

        - Starting Zope

        - Adding a top-level folder names 'mail'

        - Reading up to max messages from the Unix mailbox file, mbox
          and adding them as documents to the mail folder.


    index threshold

       Index all of the DTML documents in the database, committing
       sub-transactions after each threshold objects.

       If the threshold is less than the number of messages, then the
       size of the temporary sub-transaction commit file is output.

[-=- -=- -=- 578 lines omitted -=- -=- -=-]

        else:
            def returnf(t, c, size, mem, r, lock=lock):
                print c, r
        edits=[0]
        while len(edits) <= nedit:
            edit=whrandom.randint(0, number_of_messages)
            if not alledits.has_key(edit):
                alledits[edit]=1
                edits.append(edit)
        #print edits
        argss.append((lock, (edits, wait, ndel, nins), returnf))

    for lock, args, returnf in argss:
        start_new_thread(do, (Zope.DB, incedit, args, returnf))

    for lock, args, returnf in argss:
        lock.acquire()

    t=time.time() - t
    c=time.clock() - c
    size=db.getSize()-size
    mem=VmSize()-mem

    print t, c, size, mem

    #hist("e%s" % (threads))
    
    Zope.DB.close()

def VmSize():
    try: f=open('/proc/%s/status' % os.getpid())
    except: return 0
    else:
        l=filter(lambda l: l[:7]=='VmSize:', f.readlines())
        if l:
            l=string.split(string.strip(l[0][7:]))[0]
            return string.atoi(l)
    return 0

def pdebug():
    import pdb
    del sys.argv[1]
    pdb.run('globals()[sys.argv[1]]()')

if __name__=='__main__':
    try: f=globals()[sys.argv[1]]
    except:
        print __doc__
        sys.exit(1)
    else: f()


=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/regressionCatalog.py === (610/710 lines abridged)
#!/usr/bin/env python

# Regression test for ZCatalog


import os,sys
sys.path.insert(0,'.')

try:
    import Testing
except ImportError:
    sys.path[0] = "../../.."
    import Testing

os.environ['STUPID_LOG_FILE']= "debug.log"

here = os.getcwd()

import Zope
import ZODB, ZODB.FileStorage
from Products.ZCatalog import ZCatalog,Vocabulary
from Products.ZCatalog.Catalog import CatalogError
import Persistence
import ExtensionClass
from Testing import dispatcher
import keywords
from zLOG import LOG


import getopt,whrandom,time,string,mailbox,rfc822
import unittest_patched as unittest

# maximum number of files to read for the test suite
maxFiles = 1000

# maximum number of threads for stress testa
numThreads = 4


# number of iterations for searches
searchIterations = 1000

# number of iterations for catalog/uncatalog operations
updateIterations = 100

# input mailbox file
mbox   = os.environ.get("TESTCATALOG_MBOX","/usr/home/andreas/zope.mbox")
mbox2  = os.environ.get("TESTCATALOG_MBOX2", "/usr/home/andreas/python.mbox")

dataDir = ""

[-=- -=- -=- 610 lines omitted -=- -=- -=-]

         testSearches("testFieldIndex",numThreads= 4),
         testSearches("testFieldRangeIndex",numThreads=1),
         testSearches("testFieldRangeIndex",numThreads= 4),
         testSearches("testKeywordIndex",numThreads= 1),
         testSearches("testKeywordIndex",numThreads= 4),
         testSearches("testKeywordRangeIndex",numThreads= 1),
         testSearches("testKeywordRangeIndex",numThreads=4)
    )

    bench2_tests = (
#       testSearches("testReindexing",numThreads=1),
#        testSearches("testIncrementalIndexing",numThreads=1),
        testSearches("testUpdates",numThreads=2,numUpdates=200),
#        testSearches("testUpdates",numThreads=4,numUpdates=200)
    )

    exp_tests = (
#        testRS("testRangeSearch"),
#       testSearches("testReindexing",numThreads=1),
         testSearches("testReindexingAndModify",numThreads=1),
#        testSearches("testUpdates",numThreads=10,numUpdates=100),
    )
            
    init_tests = ( 
        BuildEnv("buildTestEnvironment",dataDir,maxFiles) ,
    )

    ts = unittest.TestSuite()
    for x in eval('%s_tests' % what): ts.addTest(x)
    return ts

    return



def pdebug():
    import pdb
    test_suite()

def debug():
   test_suite().debug()
 
def pdebug():
    import pdb
    pdb.run('debug()')


if __name__ == '__main__':
       main()



=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/regressionCatalogTiming.py ===
import os, sys
sys.path.insert(0, '.')
try:
    import Testing
    os.environ['SOFTWARE_HOME']=os.environ.get('SOFTWARE_HOME', '.')
except ImportError:
    sys.path[0]='../../..'
    import Testing
    os.environ['SOFTWARE_HOME']='../../..'

os.environ['INSTANCE_HOME']=os.environ.get(
    'INSTANCE_HOME',
    os.path.join(os.environ['SOFTWARE_HOME'],'..','..')
    )

os.environ['STUPID_LOG_FILE']=os.path.join(os.environ['INSTANCE_HOME'],'var',
                                           'debug.log')
here = os.getcwd()

import Zope
import mailbox, time, httplib
from string import strip, find, split, lower, atoi, join
from urllib import quote
from Products.ZCatalog import ZCatalog
from unittest import TestCase, TestSuite, JUnitTextTestRunner,\
     VerboseTextTestRunner, makeSuite

from Products.PluginIndexes.FieldIndex.FieldIndex import FieldIndex
from Products.PluginIndexes.TextIndex.TextIndex import TextIndex
from Products.PluginIndexes.TextIndex.Lexicon import  Lexicon
from Products.PluginIndexes.KeywordIndex.KeywordIndex import KeywordIndex

from Testing.makerequest import makerequest

TextTestRunner = VerboseTextTestRunner

class TestTimeIndex(TestCase):
    def setUp(self):
        self.app = makerequest(Zope.app())
        try: self.app._delObject('catalogtest')
        except AttributeError: pass
        self.app.manage_addFolder('catalogtest')
        zcatalog = ZCatalog.ZCatalog('catalog', 'a catalog')
        self.app.catalogtest._setObject('catalog', zcatalog)
        c = self.app.catalogtest.catalog
        for x in ('title', 'to', 'from', 'date', 'raw'):
            try: c.manage_delIndexes([x])
            except: pass
        c.manage_addIndex('title', 'TextIndex')
        c.manage_addIndex('to', 'TextIndex')
        c.manage_addIndex('from', 'TextIndex')
        c.manage_addIndex('date', 'FieldIndex')
        c.manage_addIndex('raw', 'TextIndex')
        
    def tearDown(self):
        try: self.app._delObject('catalogtest')
        except AttributeError: pass
        try:
            self.app._p_jar._db.pack()
            self.app._p_jar.close()
        except AttributeError: pass
        self.app = None
        del self.app
      
    def checkTimeBulkIndex(self):
        print
        c = self.app.catalogtest.catalog
        t = time.time()
        loadmail(self.app.catalogtest, 'zopemail',
                 os.path.join(here, 'zope.mbox'), 500)
        get_transaction().commit()
        loadtime = time.time() - t
        out("loading data took %s seconds.. " % loadtime)
        t = time.time()
        req = self.app.REQUEST
        parents = [self.app.catalogtest.catalog,
                   self.app.catalogtest, self.app]
        req['PARENTS'] = parents
        rsp = self.app.REQUEST.RESPONSE
        url1 = ''
        c.manage_catalogFoundItems(req, rsp, url1, url1,
                                   obj_metatypes=['DTML Document'])
        indextime = time.time() - t
        out("bulk index took %s seconds.. " % indextime)
        out("total time for load and index was %s seconds.. "
            % (loadtime + indextime))

    def checkTimeIncrementalIndexAndQuery(self):
        print
        c = self.app.catalogtest.catalog
        t = time.time()
        max = 500
        m = loadmail(self.app.catalogtest, 'zopemail',
                     os.path.join(here, 'zope.mbox'), max, c)
        get_transaction().commit()
        total = time.time() - t
        out("total time for load and index was %s seconds.. " % total)
        t = time.time()
        rs = c() # empty query should return all
        assert len(rs) == max, len(rs)
        dates = m['date']
        froms = m['from']
        tos =m['to']
        titles = m['title']
        assert len(c({'date':'foobarfoo'})) == 0 # should return no results
        for x in dates:
            assert len(c({'date':x})) == 1 # each date should be fieldindexed
        assert len(c({'from':'a'})) == 0 # should be caught by splitter
        assert len(c({'raw':'chris'})) != 0
        assert len(c({'raw':'gghdjkasjdsda'})) == 0
        assert c({'PrincipiaSearchSource':'the*'})
    
    def checkTimeSubcommit(self):
        print
        for x in (None,100,500,1000,10000):
            out("testing subcommit at theshhold of %s" % x)
            if x is not None:
                self.setUp()
            c = self.app.catalogtest.catalog
            c.threshold = x
            get_transaction().commit()
            t = time.time()
            loadmail(self.app.catalogtest, 'zopemail',
                     os.path.join(here, 'zope.mbox'), 500, c)
            get_transaction().commit()
            total = time.time() - t
            out("total time with subcommit thresh %s was %s seconds.. "
                % (x,total))
            self.tearDown()


# utility

def loadmail(folder, name, mbox, max=None, catalog=None):
    """
    creates a folder inside object 'folder' named 'name', opens
    filename 'mbox' and adds 'max' mail messages as DTML documents to
    the ZODB inside the folder named 'name'.  If 'catalog' (which
    should be a ZCatalog object) is passed in, call catalog_object on it
    with the document while we're iterating.  If 'max' is not None,
    only do 'max' messages, else do all messages in the mbox archive.
    """
    m = {'date':[],'from':[],'to':[],'title':[]}
    folder.manage_addFolder(name)
    folder=getattr(folder, name)
    mb=mailbox.UnixMailbox(open(mbox))
    i=0
    every=100
    message=mb.next()
    while message:
        part = `i/every * 100`
        try:
            dest = getattr(folder, part)
        except AttributeError:
            folder.manage_addFolder(part)
            dest = getattr(folder, part)
        dest.manage_addDTMLDocument(str(i), file=message.fp.read())
        doc=getattr(dest, str(i))
        i=i+1
        for h in message.headers:
            h=strip(h)
            l=find(h,':')
            if l <= 0: continue
            name=lower(h[:l])
            if name=='subject': name='title'
            h=strip(h[l+1:])
            type='string'
            if 0 and name=='date': type='date'
            elif 0:
                try: atoi(h)
                except: pass
                else: type=int
            if name=='title':
                doc.manage_changeProperties(title=h)
                m[name].append(h)
            elif name in ('to', 'from', 'date'):
                try: doc.manage_addProperty(name, h, type)
                except: pass
                m[name].append(h)
        if catalog:
            path = join(doc.getPhysicalPath(), '/')
            catalog.catalog_object(doc, path)
        if max is not None:
            if i >= max: break
        message=mb.next()
    return m

def out(s):
    print "   %s" % s

def test_suite():
    s1 = makeSuite(TestTimeIndex, 'check')
    
    testsuite = TestSuite((s1,))
    return testsuite

def main():
    mb = os.path.join(here, 'zope.mbox')
    if not os.path.isfile(mb):
        print "do you want to get the zope.mbox file from lists.zope.org?"
        print "it's required for testing (98MB, ~ 30mins on fast conn)"
        print "it's also available at korak:/home/chrism/zope.mbox" 
        print "-- type 'Y' or 'N'"
        a = raw_input()
        if lower(a[:1]) == 'y':
            server = 'lists.zope.org:80'
            method = '/pipermail/zope.mbox/zope.mbox'
            h = httplib.HTTP(server)
            h.putrequest('GET', method)
            h.putheader('User-Agent', 'silly')
            h.putheader('Accept', 'text/html')
            h.putheader('Accept', 'text/plain')
            h.putheader('Host', server)
            h.endheaders()
            errcode, errmsg, headers = h.getreply()
            if errcode != 200:
                f = h.getfile()
                data = f.read()
                print data
                raise "Error reading from host %s" % server
            f = h.getfile()
            out=open(mb,'w')
            print "this is going to take a while..."
            print "downloading mbox from %s" % server
            while 1:
                l = f.readline()
                if not l: break
                out.write(l)

    alltests=test_suite()
    runner = TextTestRunner()
    runner.run(alltests)

def debug():
    test_suite().debug()

if __name__=='__main__':
   if len(sys.argv) > 1:
      globals()[sys.argv[1]]()
   else:
      main()



=== Added File Zope/lib/python/Products/ZCatalog/regressiontests/unittest_patched.py === (630/730 lines abridged)
#!/usr/bin/env python
"""
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
Smalltalk testing framework.

Further information is available in the bundled documentation, and from

  http://pyunit.sourceforge.net/

This module contains the core framework classes that form the basis of
specific test cases and suites (TestCase, TestSuite etc.), and also a
text-based utility class for running the tests and reporting the results
(TextTestRunner).

Copyright (c) 1999, 2000, 2001 Steve Purcell
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.

IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.

THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""

# This is patched version of unittest.py and allows to pass additional
# parameters to the TestCase constructor.
# This special version is only need to run the regression test 
# in testCatalog.py#
#
# ajung

__author__ = "Steve Purcell"
__email__ = "stephen_purcell@yahoo.com"
__version__ = "$Revision: 1.2.2.1 $"[11:-2]

import time
import sys
import traceback
import string
import os

##############################################################################
# A platform-specific concession to help the code work for JPython users

[-=- -=- -=- 630 lines omitted -=- -=- -=-]

        self.parseArgs(argv)
        self.runTests()

    def usageExit(self, msg=None):
        if msg: print msg
        print self.USAGE % self.__dict__
        sys.exit(2)

    def parseArgs(self, argv):
        import getopt
        try:
            options, args = getopt.getopt(argv[1:], 'hH', ['help'])
            opts = {}
            for opt, value in options:
                if opt in ('-h','-H','--help'):
                    self.usageExit()
            if len(args) == 0 and self.defaultTest is None:
                self.test = findTestCases(self.module,
                                          suiteClass=self.suiteClass)
                return
            if len(args) > 0:
                self.testNames = args
            else:
                self.testNames = (self.defaultTest,)
            self.createTests()
        except getopt.error, msg:
            self.usageExit(msg)

    def createTests(self):
        tests = []
        for testName in self.testNames:
            tests.append(createTestInstance(testName, self.module,
                                            suiteClass=self.suiteClass))
        self.test = self.suiteClass(tests)

    def runTests(self):
        if self.testRunner is None:
            self.testRunner = TextTestRunner()
        result = self.testRunner.run(self.test)
        sys.exit(not result.wasSuccessful())    

main = TestProgram


##############################################################################
# Executing this module from the command line
##############################################################################

if __name__ == "__main__":
    main(module=None)