[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/sqlexpr/ Updated package.

Stephan Richter srichter at cosmos.phy.tufts.edu
Tue Aug 24 17:18:55 EDT 2004


Log message for revision 27255:
  Updated package.
  
  Made tests package a module.
  
  Handle many more user input errors than before.
  
  Added tests to check for all failure cases.
  


Changed:
  U   Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py
  D   Zope3/trunk/src/zope/app/sqlexpr/tests/
  A   Zope3/trunk/src/zope/app/sqlexpr/tests.py


-=-
Modified: Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py
===================================================================
--- Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py	2004-08-24 20:33:03 UTC (rev 27254)
+++ Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py	2004-08-24 21:18:55 UTC (rev 27255)
@@ -15,71 +15,60 @@
 
 $Id$
 """
-import re
+from zope.component.exceptions import ComponentLookupError
 from zope.interface import implements
-from zope.component import getService, createObject
-from zope.app.rdb import queryForResults
 from zope.tales.interfaces import ITALESExpression
-from zope.tales.expressions import NAME_RE
+from zope.tales.expressions import StringExpr
+from zope.app import zapi
+from zope.app.exception.interfaces import UserError 
+from zope.app.rdb import queryForResults
+from zope.app.rdb.interfaces import IZopeDatabaseAdapter, IZopeConnection
 
-_interp = re.compile(r'\$(%(n)s)|\${(%(n)s(?:/[^}]*)*)}' % {'n': NAME_RE})
+class ConnectionError(UserError):
+    """This exception is raised when the user did not specify an RDB
+    connection."""
 
-class NoConnectionSpecified(Exception):
-    pass
-
-class SQLExpr(object):
+class SQLExpr(StringExpr):
     """SQL Expression Handler class"""
-    implements(ITALESExpression)
 
-    def __init__(self, name, expr, engine):
-        # Completely taken from StringExpr
-        self._s = expr
-        if '%' in expr:
-            expr = expr.replace('%', '%%')
-        self._vars = vars = []
-        if '$' in expr:
-            # Use whatever expr type is registered as "path".
-            path_type = engine.getTypes()['path']
-            parts = []
-            for exp in expr.split('$$'):
-                if parts: parts.append('$')
-                m = _interp.search(exp)
-                while m is not None:
-                    parts.append(exp[:m.start()])
-                    parts.append('%s')
-                    vars.append(path_type(
-                        'path', m.group(1) or m.group(2), engine))
-                    exp = exp[m.end():]
-                    m = _interp.search(exp)
-                if '$' in exp:
-                    raise CompilerError, (
-                        '$ must be doubled or followed by a simple path')
-                parts.append(exp)
-            expr = ''.join(parts)
-        self._expr = expr
-
     def __call__(self, econtext):
-        vvals = []
-        for var in self._vars:
-            v = var(econtext)
-            if isinstance(v, (str, unicode)):
-                v = sql_quote(v)
-            vvals.append(v)
-
         if econtext.vars.has_key('sql_conn'):
             # TODO: It is hard-coded that the connection name variable is called
             # 'sql_conn'. We should find a better solution.
             conn_name = econtext.vars['sql_conn']
-            connection_service = getService("SQLDatabaseConnections",
-                                            econtext.context)
-            connection = connection_service.getConnection(conn_name)
+            adapter = zapi.queryUtility(IZopeDatabaseAdapter, conn_name)
+            if adapter is None:
+                raise ConnectionError, \
+                      ("The RDB DA name, '%s' you specified is not "
+                       "valid." %conn_name)
+            connection = adapter()
         elif econtext.vars.has_key('rdb') and econtext.vars.has_key('dsn'):
             rdb = econtext.vars['rdb']
             dsn = econtext.vars['dsn']
-            connection = createObject(None, rdb, dsn)()
+            try:
+                connection = zapi.createObject(None, rdb, dsn)()
+            except ComponentLookupError:
+                raise ConnectionError, \
+                      ("The factory id, '%s', you specified in the `rdb` "
+                       "attribute did not match any registered factory." %rdb)
+            except TypeError:
+                raise ConnectionError, \
+                      ("The factory id, '%s', you specifed did not create a "
+                       "Zope Database Adapter component." %rdb)
+            if not IZopeConnection.providedBy(connection):
+                raise ConnectionError, \
+                      ("The factory id, '%s', you specifed did not create a "
+                       "Zope Database Adapter component." %rdb)
         else:
-            raise NoConnectionSpecified
+            raise ConnectionError, \
+                  'You did not specify a RDB connection.'
 
+        vvals = []
+        for var in self._vars:
+            v = var(econtext)
+            if isinstance(v, (str, unicode)):
+                v = sql_quote(v)
+            vvals.append(v)
         query = self._expr % tuple(vvals)
         return queryForResults(connection, query)
 

Added: Zope3/trunk/src/zope/app/sqlexpr/tests.py
===================================================================
--- Zope3/trunk/src/zope/app/sqlexpr/tests.py	2004-08-24 20:33:03 UTC (rev 27254)
+++ Zope3/trunk/src/zope/app/sqlexpr/tests.py	2004-08-24 21:18:55 UTC (rev 27255)
@@ -0,0 +1,142 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""SQL Expression Type Tests
+
+$Id: test_sqlexpr.py 26878 2004-08-03 16:25:34Z jim $
+"""
+import unittest
+
+from zope.interface import implements
+from zope.component.factory import Factory
+from zope.component.interfaces import IFactory
+from zope.component.tests.placelesssetup import PlacelessSetup
+from zope.tales.tests.test_expressions import Data
+from zope.tales.engine import Engine
+
+from zope.app.tests import ztapi
+from zope.app.rdb.interfaces import IZopeDatabaseAdapter, IZopeConnection
+from zope.app.rdb.tests.stubs import ConnectionStub
+from zope.app.sqlexpr.sqlexpr import SQLExpr, ConnectionError
+
+
+class AdapterStub(object):
+    implements(IZopeDatabaseAdapter)
+
+    def __init__(self, dsn):
+        return
+
+    def __call__(self):
+        return ConnectionStub()
+
+class ConnectionStub(object):
+    implements(IZopeConnection)
+
+    def __init__(self):
+        self._called = {}
+
+    def cursor(self):
+        return CursorStub()
+
+    def answer(self):
+        return 42
+
+    def commit(self, *ignored):
+        v = self._called.setdefault('commit',0)
+        v += 1
+        self._called['commit'] = v
+
+    def rollback(self, *ignored):
+        v = self._called.setdefault('rollback',0)
+        v += 1
+        self._called['rollback'] = v
+
+class CursorStub(object):
+
+    description = (('id', 0, 0, 0, 0, 0, 0),
+                   ('name', 0, 0, 0, 0, 0, 0),
+                   ('email', 0, 0, 0, 0, 0, 0))
+
+
+    def fetchall(self, *args, **kw):
+        return ((1, 'Stephan', 'srichter'),
+               (2, 'Foo Bar', 'foobar'))
+
+    def execute(self, operation, *args, **kw):
+        if operation != 'SELECT num FROM hitchhike':
+            raise AssertionError(operation, 'SELECT num FROM hitchhike')
+
+
+class TypeInfoStub(object):
+    paramstyle = 'pyformat'
+    threadsafety = 0
+    def getConverter(self, type):
+        return lambda x: x
+
+
+class SQLExprTest(PlacelessSetup, unittest.TestCase):
+
+    def setUp(self):
+        super(SQLExprTest, self).setUp()
+        ztapi.provideUtility(IFactory, Factory(AdapterStub),
+                             'zope.da.Stub')
+        ztapi.provideUtility(IFactory, Factory(lambda x: None),
+                             'zope.Fake')
+        ztapi.provideUtility(IZopeDatabaseAdapter, AdapterStub(''),
+                             'test')
+
+    def test_exprUsingRDBAndDSN(self):
+        context = Data(vars = {'rdb': 'zope.da.Stub', 'dsn': 'dbi://test'})
+        expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+        result = expr(context)
+        self.assertEqual(1, result[0].id)
+        self.assertEqual('Stephan', result[0].name)
+        self.assertEqual('srichter', result[0].email)
+        self.assertEqual('Foo Bar', result[1].name)
+
+    def test_exprUsingSQLConn(self):
+        context = Data(vars = {'sql_conn': 'test'})
+        expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+        result = expr(context)
+        self.assertEqual(1, result[0].id)
+        self.assertEqual('Stephan', result[0].name)
+        self.assertEqual('srichter', result[0].email)
+        self.assertEqual('Foo Bar', result[1].name)
+
+    def test_exprUsingRDBAndDSN_InvalidFactoryId(self):
+        context = Data(vars = {'rdb': 'zope.da.Stub1', 'dsn': 'dbi://test'})
+        expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+        self.assertRaises(ConnectionError, expr, context)
+
+    def test_exprUsingRDBAndDSN_WrongFactory(self):
+        context = Data(vars = {'rdb': 'zope.Fake', 'dsn': 'dbi://test'})
+        expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+        self.assertRaises(ConnectionError, expr, context)
+
+    def test_exprUsingSQLConn_WrongId(self):
+        context = Data(vars = {'sql_conn': 'test1'})
+        expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+        self.assertRaises(ConnectionError, expr, context)
+
+    def test_noRDBSpecs(self):
+        expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+        self.assertRaises(ConnectionError, expr, Data(vars={}))
+
+
+def test_suite():
+    return unittest.TestSuite((
+        unittest.makeSuite(SQLExprTest),
+        ))
+
+if __name__ == '__main__':
+    unittest.main(defaultTest='test_suite')



More information about the Zope3-Checkins mailing list