[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - connection.py:1.6.2.6 db.py:1.5.2.3 scanner.py:1.2.2.4 storage.py:1.8.2.3

Shane Hathaway shane at zope.com
Sat Jan 3 15:37:10 EST 2004


Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv4617/zodb3

Modified Files:
      Tag: ape-0_8-branch
	connection.py db.py scanner.py storage.py 
Log Message:
Changed names in the scanner to make it a little easier to understand.


=== Products/Ape/lib/apelib/zodb3/connection.py 1.6.2.5 => 1.6.2.6 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.6.2.5	Sat Jan  3 00:42:44 2004
+++ Products/Ape/lib/apelib/zodb3/connection.py	Sat Jan  3 15:36:38 2004
@@ -56,13 +56,17 @@
 
     def _setDB(self, odb):
         Connection._setDB(self, odb)
-        if odb._scan_ctl is not None:
+        pool_ctl = odb.pool_scan_ctl
+        if pool_ctl is not None:
             ctl = self._scan_ctl
             if ctl is None:
-                self._scan_ctl = ctl = odb._scan_ctl.newConnection()
+                self._scan_ctl = ctl = pool_ctl.newConnection()
             if ctl.elapsed():
-                # Scan, letting the scanner know which OIDs still matter.
+                # Let the scanner know which OIDs matter.
                 ctl.setOIDs(self._cache.cache_data.keys())
+                # If it's time, scan on behalf of the whole pool.
+                if pool_ctl.elapsed():
+                    pool_ctl.scan()
                 # If there were any invalidations, process them now.
                 if self._invalidated:
                     self._flush_invalidations()


=== Products/Ape/lib/apelib/zodb3/db.py 1.5.2.2 => 1.5.2.3 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.5.2.2	Sat Dec 20 02:31:08 2003
+++ Products/Ape/lib/apelib/zodb3/db.py	Sat Jan  3 15:36:38 2004
@@ -109,11 +109,12 @@
         self._conf_resource = conf_resource
         scan_interval = int(scan_interval)
         if scan_interval > 0:
-            from scanner import ScanControl
-            ctl = ScanControl(db=self, scan_interval=scan_interval)
-            self._scan_ctl = ctl
-            ctl.scanner.setStorage(storage)
-            storage.setScanner(ctl.scanner)
+            from scanner import PoolScanControl, Scanner
+            pool_ctl = PoolScanControl(storage, db=self, scan_interval=scan_interval)
+            self.pool_scan_ctl = pool_ctl
+            scanner = Scanner()
+            storage.scanner = scanner
+            scanner.storage = storage
         else:
             self._scan_ctl = None
 


=== Products/Ape/lib/apelib/zodb3/scanner.py 1.2.2.3 => 1.2.2.4 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.2.2.3	Sat Jan  3 00:42:44 2004
+++ Products/Ape/lib/apelib/zodb3/scanner.py	Sat Jan  3 15:36:38 2004
@@ -27,27 +27,28 @@
 
 # FUTURE_TIMEOUT defines how long to keep source information regarding
 # OIDs that might be used soon.
-FUTURE_TIMEOUT = 10 * 60
+future_timeout = 10 * 60
 
 
-class ScanControl:
+class PoolScanControl:
     """Scanning for a pool of connections.
 
     A ScanControl instance is an attribute of an ApeDB instance.  The
     actual scanning is delegated to a Scanner instance attached to an
-    ApeStorage.  The delegation permits scanning to occur on a ZEO
-    server while the ScanControl instances exist on ZEO clients.
+    ApeStorage.  The delegation theoretically permits scanning to
+    occur on a ZEO server while the ScanControl instances run on
+    separate ZEO clients.
 
     Assigns scanner-specific identities to database connections for
     the purpose of tracking which OIDs are still in use.
     """
 
-    def __init__(self, db=None, scan_interval=10):
+    def __init__(self, storage, db=None, scan_interval=10):
+        self.storage = storage
         self.db = db
         self.next_conn_id = 1
         self.conn_oids = IOBTree()   # IOBTree({ conn_id -> OOSet([oid]) } })
         self.oids = OOSet()          # OOSet([oid])
-        self.scanner = Scanner()
         self.lock = allocate_lock()
         self.scan_interval = scan_interval
         self.next_scan = time() + scan_interval
@@ -67,8 +68,6 @@
 
     def setConnectionOIDs(self, conn_id, oids):
         """Records the OIDs a connection is using and periodically scans.
-
-        Scans only if a timeout for the whole connection pool has elapsed.
         """
         changed = 0
         new_oids = OOSet()
@@ -87,30 +86,37 @@
         finally:
             self.lock.release()
         if changed:
-            self.scanner.setOIDs(new_oids)
-        self._mayScan()
+            self.storage.scanner.setOIDs(new_oids)
 
 
-    def _mayScan(self):
-        """Scans for changes if the scanning interval has elapsed.
+    def elapsed(self):
+        """Returns true if the scan interval has elapsed.
         """
         now = time()
         if now >= self.next_scan:
             self.next_scan = now + self.scan_interval
-            LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
-            inv = self.scanner.scan()
-            self.scanner.pruneFuture()
-            LOG('Ape', DEBUG,
-                'Finished scanning. %d objects changed.' % len(inv))
-            if inv:
-                # Some objects changed and the caches need to be invalidated.
-                d = {}
-                for oid in inv:
-                    d[oid] = 1
-                if self.db is not None:
-                    self.db.invalidate(d)
-                else:
-                    LOG('Ape', DEBUG, "No database set, so can't invalidate!")
+            return 1
+        return 0
+
+
+    def scan(self):
+        """Runs a scan and sends invalidation messages to the database.
+        """
+        LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
+        scanner = self.storage.scanner
+        inv = scanner.scan(prune)
+        scanner.pruneFuture()
+        LOG('Ape', DEBUG,
+            'Finished scanning. %d objects changed.' % len(inv))
+        if inv:
+            # Some objects changed and the caches need to be invalidated.
+            d = {}
+            for oid in inv:
+                d[oid] = 1
+            if self.db is not None:
+                self.db.invalidate(d)
+            else:
+                LOG('Ape', DEBUG, "No database set, so can't invalidate!")
 
 
 class ConnectionScanControl:
@@ -119,27 +125,27 @@
     Delegates to a ScanControl, which in turn delegates to a Scanner.
     """
 
-    def __init__(self, ctl, conn_id):
-        self.ctl = ctl
+    def __init__(self, pool_ctl, conn_id):
+        self.pool_ctl = pool_ctl
         self.conn_id = conn_id
         self.next_update = 0
 
     def elapsed(self):
         """Returns true if the connection-specific scan interval has elapsed.
 
-        The interval is designed to prevent connections from calling
-        scanOIDs() with excessive frequency.
+        The interval prevents connections from calling setOIDs() with
+        excessive frequency.
         """
         now = time()
         if now >= self.next_update:
-            self.next_update = now + self.ctl.scan_interval
+            self.next_update = now + self.pool_ctl.scan_interval
             return 1
         return 0
 
     def setOIDs(self, oids):
         """Records the OIDs this connection is using.
         """
-        self.ctl.setConnectionOIDs(self.conn_id, oids)
+        self.pool_ctl.setConnectionOIDs(self.conn_id, oids)
 
 
 class Scanner:
@@ -155,13 +161,6 @@
         self.lock = allocate_lock()
         self.storage = None
 
-    def setStorage(self, s):
-        """Attaches this scanner to an ApeStorage.
-
-        This must be called before storage.getPollSources() will work.
-        """
-        self.storage = s
-
     def setOIDs(self, oids):
         """Sets the list of OIDs to scan.
 
@@ -211,30 +210,28 @@
                 self.lock.release()
 
 
-    def setPollSources(self, oid, sources):
-        """Sets the poll sources for one OID.
-
-        This method lets ApeStorage provide the source information
-        before it is actually requested, which might make the system
-        faster overall.  The source information is recorded in either
-        the 'current' or 'future' table.
+    def afterLoad(self, oid, sources):
+        """Called by the storage after an object is loaded.
         """
         if sources is None:
             sources = {}
         self.lock.acquire()
         try:
-            if self.current.has_key(oid):
-                # This OID is known to be in use.
-                self.current[oid] = sources
-            else:
-                # This OID might be useful soon.
+            if not self.current.has_key(oid):
+                # This object is being loaded for the first time.
+                # Make a record of its current state immediately
+                # so that the next scan can pick up changes.
                 self.future[oid] = (sources, time())
+            # else we already have info about this object, and now
+            # isn't a good time to update self.current since that that
+            # would prevent changes from being detected at a time when
+            # it's possible to send invalidation messages.
         finally:
             self.lock.release()
 
 
-    def setUncommittedSources(self, tid, oid, sources):
-        """Records source information that should only be used after commit.
+    def afterStore(self, oid, tid, sources):
+        """Called by the storage after an object is stored (but not committed.)
         """
         self.lock.acquire()
         try:
@@ -279,14 +276,12 @@
 
     def pruneFuture(self):
         """Prunes the cache of future source information.
-
-        See setPollSources().
         """
         if self.future:
             self.lock.acquire()
             try:
                 # OIDs older than some timeout will probably never be loaded.
-                cutoff = time() - FUTURE_TIMEOUT
+                cutoff = time() - future_timeout
                 for oid, (sources, atime) in self.future.items():
                     if atime < cutoff:
                         del self.future[oid]
@@ -319,13 +314,21 @@
             c = repo.poll(d)
             if c:
                 changes.update(c)
-        for oid, sources in t.items():
-            new_sources = {}
-            if sources:
-                for source, state in sources.items():
-                    state = changes.get(source, state)
-                    new_sources[source] = state
-            self.setPollSources(oid, new_sources)
+        self.lock.acquire()
+        try:
+            now = time()
+            for oid, sources in t.items():
+                new_sources = {}
+                if sources:
+                    for source, state in sources.items():
+                        state = changes.get(source, state)
+                        new_sources[source] = state
+                if self.current.has_key(oid):
+                    self.current[oid] = new_sources
+                else:
+                    self.future[oid] = (new_sources, now)
+        finally:
+            self.lock.release()
 
 
     def afterAbort(self, tid):


=== Products/Ape/lib/apelib/zodb3/storage.py 1.8.2.2 => 1.8.2.3 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.8.2.2	Sat Dec 20 23:24:06 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py	Sat Jan  3 15:36:38 2004
@@ -51,12 +51,9 @@
         if not name:
             name = 'ApeStorage: ' + ', '.join(names)
         self._ltid = None
-        self._scanner = None
+        self.scanner = None
         BaseStorage.BaseStorage.__init__(self, name)
 
-    def setScanner(self, s):
-        self._scanner = s
-
     def __len__(self):
         return 1
 
@@ -99,9 +96,9 @@
             h = self.hash64(hash_value)
             if DEBUG:
                 print 'loaded', `oid`, `h`
-            if self._scanner is not None:
+            if self.scanner is not None:
                 sources = event.mapper.gateway.getPollSources(event)
-                self._scanner.setPollSources(oid, sources)
+                self.scanner.afterLoad(oid, sources)
             return data, h
         finally:
             self._lock_release()
@@ -143,9 +140,9 @@
             classified_state = u.load()
             event, new_hash = self._gwio.store(oid, classified_state, is_new)
             new_h64 = self.hash64(new_hash)
-            if self._scanner is not None:
+            if self.scanner is not None:
                 sources = event.mapper.gateway.getPollSources(event)
-                self._scanner.setUncommittedSources(self._serial, oid, sources)
+                self.scanner.afterStore(oid, self._serial, sources)
         finally:
             self._lock_release()
 
@@ -172,8 +169,8 @@
     def _abort(self):
         for c in self._conn_list:
             c.abort()
-        if self._scanner is not None:
-            self._scanner.afterAbort(self._serial)
+        if self.scanner is not None:
+            self.scanner.afterAbort(self._serial)
 
     def _begin(self, tid, u, d, e):
         for c in self._conn_list:
@@ -183,8 +180,8 @@
         for c in self._conn_list:
             c.finish()
         self._ltid = self._serial
-        if self._scanner is not None:
-            self._scanner.afterCommit(self._serial)
+        if self.scanner is not None:
+            self.scanner.afterCommit(self._serial)
 
     def _vote(self):
         for c in self._conn_list:




More information about the Zope-CVS mailing list