[Zope-Checkins] CVS: Products/DCOracle2/test - testDCOracle2.py:1.1 dispatcher.py:1.1

Chris Withers cvs-admin at zope.org
Thu Oct 30 12:35:42 EST 2003


Update of /cvs-repository/Products/DCOracle2/test
In directory cvs.zope.org:/tmp/cvs-serv13922/test

Added Files:
	testDCOracle2.py dispatcher.py 
Log Message:
refreshed Andreas' test suite and put it in the right place.


=== Added File Products/DCOracle2/test/testDCOracle2.py ===
import common
import DCOracle2
import dispatcher
import string
import unittest

class test_DC(unittest.TestCase,dispatcher.Dispatcher):
    """unittests for DCOracle 2"""    

    def __init__(self,func,*args,**kw):
        unittest.TestCase.__init__(self,func)
        dispatcher.Dispatcher.__init__(self,func)


    def setUp(self):
        self.conn = DCOracle2.connect(common.getConnectionString())
        self.c    = self.conn.cursor()


    def tearDown(self):
        self.conn.close()
        self.conn=self.c=None


    def testConnect(self):
        """ try to connect to Oracle """
        pass


    def testCreateTable(self):
        """ try to create test table """

        try: self.c.execute("drop table partlist") 
        except: pass

        self.c.execute("create table partlist (part varchar2(62),  \
            partno integer, quantity integer, price number(10,2)) tablespace users")   

        file = open("namegen.out","r")

        v = []

        for l in file.readlines():
            (part, partno, quantity, price) = string.split(string.strip(l),',')
            v.append((part, partno, quantity, price))

        file.close()

        self.c.executemany("insert into partlist (part, partno, quantity, price) values (:1,:2,:3,:4)", v)
        self.conn.commit()


        self.c.execute("select part, partno, quantity, price from partlist order by partno")
        results = self.c.fetchall()


    def testConcurrentReads(self):
        self.dispatcher("testConcurrentReads",("funcConcurrentReads",3,(),{}))


    def funcConcurrentReads(self):

        env = self.th_setup()

        curs = self.conn.cursor()

        for i in range(5):
            curs.execute("select part, partno, quantity, price from partlist order by partno")
            results = curs.fetchall()

        self.th_teardown(env)


    def testCLOBS(self,iterations=30):
        """ testing CLOBs """
        self.funcLOBS(iterations,"CLOB")

    def testBLOBS(self,iterations=30):
        """ testing BLOBs """
        self.funcLOBS(iterations,"BLOB")


    def funcLOBS(self,iterations=30,t="CLOB"):
        """ test LOBS """

        try: self.c.execute("drop table testtab") 
        except: pass

        self.c.execute("create table testtab(num integer,content %s) tablespace users" % t)

        d={}
        for i in range(iterations):
            d[i] = chr(ord('a'))*100000


        for i in range(iterations):
            self.c.execute("insert into  testtab values(:1,EMPTY_%s())" % t , i)
            self.c.execute("select content from testtab where num=%d for update" % i)
            r = self.c.fetchone()
            lob = r[0]
            lob.write(d[i])
            self.conn.commit()


        self.c.execute("select num,content from testtab order by num")
        for r in self.c.fetchall():
            num = r[0]
            lob = r[1]
            data = lob.read()

            assert data==d[num],"Invalid data in LOB found"


    def testStoredProc1(self):
        """ testing describe() for stored procedures """

        db = self.conn

        d = db.describe('emp_actions')
        e = db.describe('emp')
        p = db.mapproc('emp_actions')


    def testStoredProc2(self):
        """ testing stored procedures """

        db      = self.conn

        find    = db.procedure.emp_actions.find
        findy   = db.procedure.emp_actions.findy

        self.c.execute('select empno, ename from emp')
        for r in self.c.fetchall():
            empid = r[0]
            ename = r[1]
            fe = find(empid)
            assert ename==fe,"Wrong employee name found %d vs. %d" % (ename,fe)
 

if __name__=="__main__":
    unittest.main()

        


=== Added File Products/DCOracle2/test/dispatcher.py ===
#!/usr/bin/env python1.5


# Dispatcher for usage inside Zope test environment
# Digital Creations

__version__ = '$Id: dispatcher.py,v 1.1 2003/10/30 17:35:41 chrisw Exp $'


import os,sys,re,string
import threading,time,commands,profile

from StringIO import StringIO

class Dispatcher:

    """ 
    a multi-purpose thread dispatcher 
    """
    
    def __init__(self,func=''):
        # rather than spewing to stderr, put it in a log attribute
        # if people care, they can retrieve this
        self.log = StringIO()
        self.fp = self.log
        self.f_startup = []
        self.f_teardown = []
        self.lastlog = ""
        self.lock 	    = threading.Lock()
        self.func = func
        self.profiling = 0

        self.doc = getattr(self,self.func).__doc__
        
    def setlog(self,fp):
        self.fp = fp
        
    def log(self,s):
        if s==self.lastlog: return
        self.fp.write(s)
        self.fp.flush()
        self.lastlog=s
        
    def logn(self,s):
        if s==self.lastlog: return
        self.fp.write(s + '\n')
        self.fp.flush()
        self.lastlog=s


    def profiling_on():
        self.profiling = 1

    def profiling_off():
        self.profiling = 0
        
        
    def  dispatcher(self,name='', *params):
        """ dispatcher for threads 
        The dispatcher expects one or several tupels:
        (functionname, number of threads to start , args, keyword args)
        """
        
        self.mem_usage  = [-1]

        mem_watcher = threading.Thread(None,self.mem_watcher,name='memwatcher')
        mem_watcher.start()
        
        self.start_test = time.time()
        self.name 		= name
        self.th_data    = {}
        self.runtime    = {}
        self._threads   = []
        s2s=self.s2s
        
        
        for func,numthreads,args,kw in params:
            f = getattr(self,func)
            
            for i in range(0,numthreads):
                kw['t_func'] = func
                th = threading.Thread(None,self.worker,name="TH_%s_%03d" % (func,i) ,args=args,kwargs=kw)
                self._threads.append(th)
                
        for th in self._threads: 			th.start()
        while threading.activeCount() > 1: time.sleep(1)
        
        self.logn('ID: %s ' % self.name)
        self.logn('FUNC: %s ' % self.func)
        self.logn('DOC: %s ' % self.doc)
        self.logn('Args: %s' % params)
        
        for th in self._threads:
            self.logn( '%-30s ........................ %9.3f sec' % (th.getName(), self.runtime[th.getName()]) )
            for k,v in self.th_data[th.getName()].items():
                self.logn ('%-30s  %-15s = %s' % (' ',k,v) )
                
               
        self.logn("") 
        self.logn('Complete running time:                                  %9.3f sec' % (time.time()-self.start_test) )
        if len(self.mem_usage)>1: self.mem_usage.remove(-1)
        self.logn( "Memory: start: %s, end: %s, low: %s, high: %s" %  \
                        (s2s(self.mem_usage[0]),s2s(self.mem_usage[-1]),s2s(min(self.mem_usage)), s2s(max(self.mem_usage))))
        self.logn('')
        
        
    def worker(self,*args,**kw):
    
        for func in self.f_startup: f = getattr(self,func)()
        
        t_func = getattr(self,kw['t_func'])
        del kw['t_func']
        
        ts = time.time()
        apply(t_func,args,kw)			
        te = time.time()
        
        for func in self.f_teardown: getattr(self,func)()
        
        
        
    def th_setup(self):
        """ initalize thread with some environment data """
        
        env = {'start': time.time()
                  }
        return env
        
        
    def th_teardown(self,env,**kw):
        """ famous last actions of thread """
        
        self.lock.acquire()
        self.th_data[ threading.currentThread().getName() ]   = kw
        self.runtime  [ threading.currentThread().getName() ] = time.time() - env['start']
        self.lock.release()
        
        
    def getmem(self):
        """ try to determine the current memory usage """
       
        if not sys.platform in ['linux2']: return None
        cmd = '/bin/ps --no-headers -o pid,vsize --pid %s' % os.getpid()
        outp = commands.getoutput(cmd)
        pid,vsize = filter(lambda x: x!="" , string.split(outp," ") )

        data = open("/proc/%d/statm" % os.getpid()).read()
        fields = re.split(" ",data)
        mem = string.atoi(fields[0]) * 4096

       
        return mem
        
        
    def mem_watcher(self):
        """ thread for watching memory usage """
      
        running = 1 

        while running ==1:
            self.mem_usage.append( self.getmem() )
            time.sleep(1)
            if threading.activeCount() == 2: running = 0
            
            
    def register_startup(self,func):
        self.f_startup.append(func)
        
    def register_teardown(self,func):
        self.f_teardown.append(func)
        

    def s2s(self,n):
        import math
        if n <1024.0: return "%8.3lf Bytes" % n
        if n <1024.0*1024.0: return "%8.3lf KB" % (1.0*n/1024.0)
        if n <1024.0*1024.0*1024.0: return "%8.3lf MB" % (1.0*n/1024.0/1024.0)
        else: return n

if __name__=="__main__":        

    d=Dispatcher()
    # print d.getmem()
        




More information about the Zope-Checkins mailing list