[Zope3-checkins] CVS: Zope3/src/zodb/tests - test_pool.py:1.1 test_invalidation.py:1.2

Jeremy Hylton jeremy@zope.com
Tue, 21 Jan 2003 13:19:59 -0500


Update of /cvs-repository/Zope3/src/zodb/tests
In directory cvs.zope.org:/tmp/cvs-serv9061/zodb/tests

Modified Files:
	test_invalidation.py 
Added Files:
	test_pool.py 
Log Message:
Repair pool and invalidation logic in database.

Make sure invalidations are sent to all connections, regardless of
whether they are opened.  Fix logic of connection pools so that
_allocated contains all connections and _pool contains the ones
currently available.

Add tests cases to verify fixes.


=== Added File Zope3/src/zodb/tests/test_pool.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the pool size logic in the database."""

import threading
import time
import unittest

from zodb.storage.mapping import DB

class Counter:

    def __init__(self):
        self._count = 0
        self._lock = threading.Lock()

    def inc(self):
        self._lock.acquire()
        try:
            self._count += 1
        finally:
            self._lock.release()

    def get(self):
        self._lock.acquire()
        try:
            return self._count
        finally:
            self._lock.release()

class ConnectThread(threading.Thread):

    def __init__(self, db, start_counter, open_counter, close_event):
        threading.Thread.__init__(self)
        self._db = db
        self._start = start_counter
        self._open = open_counter
        self._close = close_event

    def run(self):
        self._start.inc()
        cn = self._db.open()
        self._open.inc()
        cn.root()
        self._close.wait()
        cn.close()

class PoolTest(unittest.TestCase):

    def setUp(self):
        self.close = threading.Event()
        self.db = DB(pool_size=7)
        self.threads = []

    def tearDown(self):
        self.close.set()
        for t in self.threads:
            t.join()

    def testPoolLimit(self):
        # The default limit is 7, so try it with 10 threads.
        started = Counter()
        opened = Counter()
        for i in range(10):
            t = ConnectThread(self.db, started, opened, self.close)
            t.start()
            self.threads.append(t)

        # It's hard to get the thread synchronization right, but
        # this seems like it will sometimes do the right thing.

        # Wait for all the threads to call open().  It's possible
        # that a thread has started but not yet called open.
        for i in range(10):
            if started.get() < 10:
                time.sleep(0.1)
            else:
                break
        else:
            if started.get() < 10:
                self.fail("Only started %d threads out of 10" % started.get())

        # Now make sure that only 7 of the 10 threads opened.
        for i in range(10):
            if opened.get() < 7:
                time.sleep(0.1)
            else:
                break
        else:
            if opened.get() != 7:
                self.fail("Expected 7 threads to open, %d did" % opened.get())

        self.close.set()

        # Now make sure the last three get opened
        for i in range(10):
            if opened.get() < 10:
                time.sleep(0.1)
            else:
                break
        else:
            if opened.get() != 10:
                self.fail("Expected 10 threads to open, %d did" % opened.get())

def test_suite():
    return unittest.makeSuite(PoolTest)


=== Zope3/src/zodb/tests/test_invalidation.py 1.1 => 1.2 ===
--- Zope3/src/zodb/tests/test_invalidation.py:1.1	Wed Jan 15 18:33:06 2003
+++ Zope3/src/zodb/tests/test_invalidation.py	Tue Jan 21 13:19:56 2003
@@ -16,6 +16,7 @@
 import unittest
 
 from zodb.db import DB
+from zodb.connection import Connection
 from zodb.storage.mapping import MappingStorage
 from zodb.storage.tests.minpo import MinPO
 
@@ -23,12 +24,12 @@
 
 class InvalidationTests(unittest.TestCase):
 
-    num_connections = 3
+    num_conn = 4
 
     def setUp(self):
         self.storage = MappingStorage()
         self.db = DB(self.storage)
-        self.connections = [self.db.open() for i in range(4)]
+        self.connections = [self.db.open() for i in range(self.num_conn)]
 
     def tearDown(self):
         self.db.close()
@@ -47,7 +48,26 @@
             cn.sync()
             root = cn.root()
             self.assertEqual(root[1].value, 2)
-        
+
+    def testSimpleInvalidationClosedConnection(self):
+        # Test that invalidations are still sent to closed connections.
+        root = self.connections[0].root()
+        obj = root[1] = MinPO(1)
+        get_transaction().commit()
+        for cn in self.connections[1:]:
+            cn.sync()
+            root = cn.root()
+            self.assertEqual(root[1].value, 1)
+            cn.close()
+        obj.value = 2
+        get_transaction().commit()
+        for cn in self.connections[1:]:
+            self.assert_(obj._p_oid in cn._invalidated)
+            # Call reset() which is equivalent to re-opening it via
+            # the db connection pool.
+            cn.reset()
+            root = cn.root()
+            self.assertEqual(root[1].value, 2)
 
 def test_suite():
     return unittest.makeSuite(InvalidationTests)