[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Refactored the connection pool implementation. (I have a feeling that

Jim Fulton jim at zope.com
Thu Oct 30 17:03:45 EDT 2008


Log message for revision 92720:
  Refactored the connection pool implementation. (I have a feeling that
  it could be made simpler still.)
  

Changed:
  U   ZODB/trunk/src/ZODB/DB.py
  U   ZODB/trunk/src/ZODB/historical_connections.txt
  U   ZODB/trunk/src/ZODB/tests/dbopen.txt
  U   ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py

-=-
Modified: ZODB/trunk/src/ZODB/DB.py
===================================================================
--- ZODB/trunk/src/ZODB/DB.py	2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/DB.py	2008-10-30 21:03:41 UTC (rev 92720)
@@ -103,22 +103,21 @@
     def getTimeout(self):
         return self._timeout
 
-    timeout = property(getTimeout, setTimeout)
+    timeout = property(getTimeout, lambda self, v: self.setTimeout(v))
 
-    size = property(getSize, setSize)
+    size = property(getSize, lambda self, v: self.setSize(v))
 
 class ConnectionPool(AbstractConnectionPool):
 
-    def __init__(self, size, timeout=None):
+    def __init__(self, size, timeout=time()):
         super(ConnectionPool, self).__init__(size, timeout)
 
         # A stack of connections available to hand out.  This is a subset
         # of self.all.  push() and repush() add to this, and may remove
         # the oldest available connections if the pool is too large.
         # pop() pops this stack.  There are never more than size entries
-        # in this stack.  The keys are time.time() values of the push or
-        # repush calls.
-        self.available = BTrees.OOBTree.Bucket()
+        # in this stack.
+        self.available = []
 
     def push(self, c):
         """Register a new available connection.
@@ -127,10 +126,10 @@
         stack even if we're over the pool size limit.
         """
         assert c not in self.all
-        assert c not in self.available.values()
+        assert c not in self.available
         self._reduce_size(strictly_less=True)
         self.all.add(c)
-        self.available[time()] = c
+        self.available.append((time(), c))
         n = len(self.all)
         limit = self.size
         if n > limit:
@@ -147,44 +146,43 @@
         older available connections.
         """
         assert c in self.all
-        assert c not in self.available.values()
+        assert c not in self.available
         self._reduce_size(strictly_less=True)
-        self.available[time()] = c
+        self.available.append((time(), c))
 
     def _reduce_size(self, strictly_less=False):
         """Throw away the oldest available connections until we're under our
         target size (strictly_less=False, the default) or no more than that
         (strictly_less=True).
         """
-        if self.timeout is None:
-            threshhold = None
-        else:
-            threshhold = time() - self.timeout
+        threshhold = time() - self.timeout
         target = self.size
         if strictly_less:
             target -= 1
-        for t, c in list(self.available.items()):
-            if (len(self.available) > target or
-                threshhold is not None and t < threshhold):
-                del self.available[t]
-                self.all.remove(c)
-                # While application code may still hold a reference to `c`,
-                # there's little useful that can be done with this Connection
-                # anymore. Its cache may be holding on to limited resources,
-                # and we replace the cache with an empty one now so that we
-                # don't have to wait for gc to reclaim it. Note that it's not
-                # possible for DB.open() to return `c` again: `c` can never be
-                # in an open state again.
-                # TODO: Perhaps it would be better to break the reference
-                # cycles between `c` and `c._cache`, so that refcounting
-                # reclaims both right now. But if user code _does_ have a
-                # strong reference to `c` now, breaking the cycle would not
-                # reclaim `c` now, and `c` would be left in a user-visible
-                # crazy state.
-                c._resetCache()
-            else:
-                break
 
+        available = self.available
+        while (
+            (len(available) > target)
+            or
+            (available and available[0][0] < threshhold)
+            ):
+            t, c = available.pop(0)
+            self.all.remove(c)
+            # While application code may still hold a reference to `c`,
+            # there's little useful that can be done with this Connection
+            # anymore. Its cache may be holding on to limited resources,
+            # and we replace the cache with an empty one now so that we
+            # don't have to wait for gc to reclaim it. Note that it's not
+            # possible for DB.open() to return `c` again: `c` can never be
+            # in an open state again.
+            # TODO: Perhaps it would be better to break the reference
+            # cycles between `c` and `c._cache`, so that refcounting
+            # reclaims both right now. But if user code _does_ have a
+            # strong reference to `c` now, breaking the cycle would not
+            # reclaim `c` now, and `c` would be left in a user-visible
+            # crazy state.
+            c._resetCache()
+
     def reduce_size(self):
         self._reduce_size()
 
@@ -197,7 +195,7 @@
         """
         result = None
         if self.available:
-            result = self.available.pop(self.available.maxKey())
+            _, result = self.available.pop()
             # Leave it in self.all, so we can still get at it for statistics
             # while it's alive.
             assert result in self.all
@@ -212,19 +210,15 @@
         
         If a connection is no longer viable because it has timed out, it is
         garbage collected."""
-        if self.timeout is None:
-            threshhold = None
-        else:
-            threshhold = time() - self.timeout
-        for t, c in tuple(self.available.items()):
-            if threshhold is not None and t < threshhold:
+        threshhold = time() - self.timeout
+        for t, c in list(self.available):
+            if t < threshhold:
                 del self.available[t]
                 self.all.remove(c)
                 c._resetCache()
             else:
                 c.cacheGC()
 
-
 class KeyedConnectionPool(AbstractConnectionPool):
     # this pool keeps track of keyed connections all together.  It makes
     # it possible to make assertions about total numbers of keyed connections.
@@ -233,100 +227,69 @@
 
     # see the comments in ConnectionPool for method descriptions.
 
-    def __init__(self, size, timeout=None):
+    def __init__(self, size, timeout=time()):
         super(KeyedConnectionPool, self).__init__(size, timeout)
-        # key: {time.time: connection}
-        self.available = BTrees.family32.OO.Bucket()
-        # time.time: key
-        self.closed = BTrees.family32.OO.Bucket()
+        self.pools = {}
 
+    def setSize(self, v):
+        self._size = v
+        for pool in self.pools.values():
+            pool.setSize(v)
+
+    def setTimeout(self, v):
+        self._timeout = v
+        for pool in self.pools.values():
+            pool.setTimeout(v)
+
     def push(self, c, key):
-        assert c not in self.all
-        available = self.available.get(key)
-        if available is None:
-            available = self.available[key] = BTrees.family32.OO.Bucket()
-        else:
-            assert c not in available.values()
-        self._reduce_size(strictly_less=True)
-        self.all.add(c)
-        t = time()
-        available[t] = c
-        self.closed[t] = key
-        n = len(self.all)
-        limit = self.size
-        if n > limit:
-            reporter = logger.warn
-            if n > 2 * limit:
-                reporter = logger.critical
-            reporter("DB.open() has %s open connections with a size "
-                     "of %s", n, limit)
+        pool = self.pools.get(key)
+        if pool is None:
+            pool = self.pools[key] = ConnectionPool(self.size, self.timeout)
+        pool.push(c)
 
     def repush(self, c, key):
-        assert c in self.all
-        self._reduce_size(strictly_less=True)
-        available = self.available.get(key)
-        if available is None:
-            available = self.available[key] = BTrees.family32.OO.Bucket()
-        else:
-            assert c not in available.values()
-        t = time()
-        available[t] = c
-        self.closed[t] = key
+        self.pools[key].repush(c)
 
     def _reduce_size(self, strictly_less=False):
-        if self.timeout is None:
-            threshhold = None
-        else:
-            threshhold = time() - self.timeout
-        target = self.size
-        if strictly_less:
-            target -= 1
-        for t, key in tuple(self.closed.items()):
-            if (len(self.available) > target or
-                threshhold is not None and t < threshhold):
-                del self.closed[t]
-                c = self.available[key].pop(t)
-                if not self.available[key]:
-                    del self.available[key]
-                self.all.remove(c)
-                c._resetCache()
-            else:
-                break
+        for key, pool in list(self.pools.items()):
+            pool._reduce_size(strictly_less)
+            if not pool.all:
+                del self.pools[key]
 
     def reduce_size(self):
         self._reduce_size()
 
     def pop(self, key):
-        result = None
-        available = self.available.get(key)
-        if available:
-            t = available.maxKey()
-            result = available.pop(t)
-            del self.closed[t]
-            if not available:
-                del self.available[key]
-            assert result in self.all
-        return result
+        pool = self.pools.get(key)
+        if pool is not None:
+            return pool.pop()
 
     def map(self, f):
-        self.all.map(f)
+        for pool in self.pools.itervalues():
+            pool.map(f)
 
     def availableGC(self):
-        if self.timeout is None:
-            threshhold = None
-        else:
-            threshhold = time() - self.timeout
-        for t, key in tuple(self.closed.items()):
-            if threshhold is not None and t < threshhold:
-                del self.closed[t]
-                c = self.available[key].pop(t)
-                if not self.available[key]:
-                    del self.available[key]
-                self.all.remove(c)
-                c._resetCache()
-            else:
-                self.available[key][t].cacheGC()
+        for key, pool in self.pools.items():
+            pool.availableGC()
+            if not pool.all:
+                del self.pools[key]
 
+    @property
+    def test_all(self):
+        result = set()
+        for pool in self.pools.itervalues():
+            result.update(pool.all)
+        return frozenset(result)
+
+    @property
+    def test_available(self):
+        result = []
+        for pool in self.pools.itervalues():
+            result.extend(pool.available)
+        return tuple(result)
+        
+        
+
 def toTimeStamp(dt):
     utc_struct = dt.utctimetuple()
     # if this is a leapsecond, this will probably fail.  That may be a good

Modified: ZODB/trunk/src/ZODB/historical_connections.txt
===================================================================
--- ZODB/trunk/src/ZODB/historical_connections.txt	2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/historical_connections.txt	2008-10-30 21:03:41 UTC (rev 92720)
@@ -13,11 +13,7 @@
 
 A database can be opened historically ``at`` or ``before`` a given transaction
 serial or datetime. Here's a simple example. It should work with any storage
-that supports ``loadBefore``.  Unfortunately that does not include
-MappingStorage, so we use a FileStorage instance.  Also unfortunately, as of
-this writing there is no reliable way to determine if a storage truly
-implements loadBefore, or if it simply returns None (as in BaseStorage), other
-than reading code.
+that supports ``loadBefore``. 
 
 We'll begin our example with a fairly standard set up.  We
 
@@ -28,11 +24,8 @@
 - modify the database again; and
 - commit a transaction.
 
-    >>> import ZODB.FileStorage
-    >>> storage = ZODB.FileStorage.FileStorage(
-    ...     'HistoricalConnectionTests.fs', create=True)
-    >>> import ZODB
-    >>> db = ZODB.DB(storage)
+    >>> import ZODB.MappingStorage
+    >>> db = ZODB.MappingStorage.DB()
     >>> conn = db.open()
 
     >>> import persistent.mapping
@@ -42,14 +35,13 @@
     >>> import transaction
     >>> transaction.commit()
 
-We wait for some ttime to pass, and then make some other changes.
+We wait for some time to pass, record he time, and then make some other changes.
     
     >>> import time
     >>> t = time.time()
     >>> while time.time() <= t:
     ...     time.sleep(.001)    
 
-
     >>> import datetime
     >>> now = datetime.datetime.utcnow()
     
@@ -164,187 +156,81 @@
     >>> db.getHistoricalTimeout()
     400
 
-All three of these values can be specified in a ZConfig file.  We're using
-mapping storage for simplicity, but remember, as we said at the start of this
-document, mapping storage will not work for historical connections (and in fact
-may seem to work but then fail confusingly) because it does not implement
-loadBefore.
+All three of these values can be specified in a ZConfig file. 
 
     >>> import ZODB.config
     >>> db2 = ZODB.config.databaseFromString('''
     ...     <zodb>
     ...       <mappingstorage/>
-    ...       historical-pool-size 5
+    ...       historical-pool-size 3
     ...       historical-cache-size 1500
     ...       historical-timeout 6m
     ...     </zodb>
     ... ''')
     >>> db2.getHistoricalPoolSize()
-    5
+    3
     >>> db2.getHistoricalCacheSize()
     1500
     >>> db2.getHistoricalTimeout()
     360
 
-Let's actually look at these values at work by shining some light into what
-has been a black box up to now.  We'll actually do some white box examination
-of what is going on in the database, pools and connections.
 
-Historical connections are held in a single connection pool with mappings
-from the ``before`` TID to available connections.  First we'll put a new
-pool on the database so we have a clean slate.
+The pool lets us reuse connections.  To see this, we'll open some
+connections, close them, and then open them again:
 
-    >>> historical_conn.close()
-    >>> from ZODB.DB import KeyedConnectionPool
-    >>> db.historical_pool = KeyedConnectionPool(
-    ...     db.historical_pool.size, db.historical_pool.timeout)
+    >>> conns1 = [db2.open(before=serial) for i in range(4)]
+    >>> _ = [c.close() for c in conns1]
+    >>> conns2 = [db2.open(before=serial) for i in range(4)]
 
-Now lets look what happens to the pool when we create and close an historical
-connection.
+Now let's look at what we got.  The first connection in conns 2 is the
+last connection in conns1, because it was the last connection closed.
 
-    >>> pool = db.historical_pool
-    >>> len(pool.all)
-    0
-    >>> len(pool.available)
-    0
-    >>> historical_conn = db.open(
-    ...     transaction_manager=transaction1, before=serial)
-    >>> len(pool.all)
-    1
-    >>> len(pool.available)
-    0
-    >>> historical_conn in pool.all
+    >>> conns2[0] is conns1[-1]
     True
-    >>> historical_conn.close()
-    >>> len(pool.all)
-    1
-    >>> len(pool.available)
-    1
-    >>> pool.available.keys()[0] == serial
-    True
-    >>> len(pool.available.values()[0])
-    1
 
-Now we'll open and close two for the same serial to see what happens to the
-data structures.
+Also for the next two:
 
-    >>> historical_conn is db.open(
-    ...     transaction_manager=transaction1, before=serial)
-    True
-    >>> len(pool.all)
-    1
-    >>> len(pool.available)
-    0
-    >>> transaction2 = transaction.TransactionManager()
-    >>> historical_conn2 = db.open(
-    ...     transaction_manager=transaction2, before=serial)
-    >>> len(pool.all)
-    2
-    >>> len(pool.available)
-    0
-    >>> historical_conn2.close()
-    >>> len(pool.all)
-    2
-    >>> len(pool.available)
-    1
-    >>> len(pool.available.values()[0])
-    1
-    >>> historical_conn.close()
-    >>> len(pool.all)
-    2
-    >>> len(pool.available)
-    1
-    >>> len(pool.available.values()[0])
-    2
+    >>> (conns2[1] is conns1[-2]), (conns2[2] is conns1[-3])
+    (True, True)
 
-If you change the historical cache size, that changes the size of the
-persistent cache on our connection.
+But not for the last:
 
-    >>> historical_conn._cache.cache_size
-    2000
-    >>> db.setHistoricalCacheSize(1500)
-    >>> historical_conn._cache.cache_size
-    1500
+    >>> conns2[3] is conns1[-4]
+    False
 
-Now let's look at pool sizes.  We'll set it to two, then open and close three
-connections.  We should end up with only two available connections.
+Because the pool size was set to 3.
 
-    >>> db.setHistoricalPoolSize(2)
+Connections are also discarded if they haven't been used in a while.
+To see this, let's close two of the connections:
 
-    >>> historical_conn = db.open(
-    ...     transaction_manager=transaction1, before=serial)
-    >>> historical_conn2 = db.open(
-    ...     transaction_manager=transaction2, before=serial)
-    >>> transaction3 = transaction.TransactionManager()
-    >>> historical_conn3 = db.open(
-    ...     transaction_manager=transaction3, at=historical_serial)
-    >>> len(pool.all)
-    3
-    >>> len(pool.available)
-    0
+    >>> conns2[0].close(); conns2[1].close()
 
-    >>> historical_conn3.close()
-    >>> len(pool.all)
-    3
-    >>> len(pool.available)
-    1
-    >>> len(pool.available.values()[0])
-    1
+We'l also set the historical timeout to be very low:
 
-    >>> historical_conn2.close()
-    >>> len(pool.all)
-    3
-    >>> len(pool.available)
-    2
-    >>> len(pool.available.values()[0])
-    1
-    >>> len(pool.available.values()[1])
-    1
+    >>> db2.setHistoricalTimeout(.01)
+    >>> time.sleep(.1)
+    >>> conns2[2].close(); conns2[3].close()
 
-    >>> historical_conn.close()
-    >>> len(pool.all)
-    2
-    >>> len(pool.available)
-    1
-    >>> len(pool.available.values()[0])
-    2
+Now, when we open 4 connections:
 
-Notice it dumped the one that was closed at the earliest time.
+    >>> conns1 = [db2.open(before=serial) for i in range(4)]
 
-Finally, we'll look at the timeout.  We'll need to monkeypatch ``time`` for
-this.  (The funky __import__ of DB is because some ZODB __init__ shenanigans
-make the DB class mask the DB module.)
+We'll see that only the last 2 connections from conn2 are in the
+result:
 
-    >>> db.getHistoricalTimeout()
-    400
-    >>> import time
-    >>> delta = 200
-    >>> def stub_time():
-    ...     return time.time() + delta
-    ...
-    >>> DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
-    >>> original_time = DB_module.time
-    >>> DB_module.time = stub_time
+    >>> [c in conns1 for c in conns2]
+    [False, False, True, True]
 
-    >>> historical_conn = db.open(before=serial)
 
-    >>> len(pool.all)
-    2
-    >>> len(pool.available)
-    1
+If you change the historical cache size, that changes the size of the
+persistent cache on our connection.
 
-A close or an open will do garbage collection on the timed out connections.
+    >>> historical_conn._cache.cache_size
+    2000
+    >>> db.setHistoricalCacheSize(1500)
+    >>> historical_conn._cache.cache_size
+    1500
 
-    >>> delta += 200
-    >>> historical_conn.close()
-
-    >>> len(pool.all)
-    1
-    >>> len(pool.available)
-    1
-    >>> len(pool.available.values()[0])
-    1
-
 Invalidations
 =============
 

Modified: ZODB/trunk/src/ZODB/tests/dbopen.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/dbopen.txt	2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/tests/dbopen.txt	2008-10-30 21:03:41 UTC (rev 92720)
@@ -239,12 +239,12 @@
 Closing another one will purge the one with MARKER 0 from the stack
 (since it was the first added to the stack):
 
-    >>> [c.MARKER for c in pool.available.values()]
+    >>> [c.MARKER for (t, c) in pool.available]
     [0, 1, 2]
     >>> conns[0].close()  # MARKER 3
     >>> len(pool.available), len(pool.all)
     (3, 5)
-    >>> [c.MARKER for c in pool.available.values()]
+    >>> [c.MARKER for (t, c) in pool.available]
     [1, 2, 3]
 
 Similarly for the other two:
@@ -252,7 +252,7 @@
     >>> conns[1].close(); conns[2].close()
     >>> len(pool.available), len(pool.all)
     (3, 3)
-    >>> [c.MARKER for c in pool.available.values()]
+    >>> [c.MARKER for (t, c) in pool.available]
     [3, 4, 5]
 
 Reducing the pool size may also purge the oldest closed connections:
@@ -260,7 +260,7 @@
     >>> db.setPoolSize(2)  # gets rid of MARKER 3
     >>> len(pool.available), len(pool.all)
     (2, 2)
-    >>> [c.MARKER for c in pool.available.values()]
+    >>> [c.MARKER for (t, c) in pool.available]
     [4, 5]
 
 Since MARKER 5 is still the last one added to the stack, it will be the

Modified: ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py	2008-10-30 18:08:21 UTC (rev 92719)
+++ ZODB/trunk/src/ZODB/tests/testhistoricalconnections.py	2008-10-30 21:03:41 UTC (rev 92720)
@@ -25,10 +25,7 @@
 def tearDown(test):
     test.globs['db'].close()
     test.globs['db2'].close()
-    test.globs['storage'].close()
     # the DB class masks the module because of __init__ shenanigans
-    DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
-    DB_module.time = test.globs['original_time']
     module.tearDown(test)
     ZODB.tests.util.tearDown(test)
 



More information about the Zodb-checkins mailing list