[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - scanner.py:1.2 connection.py:1.6 db.py:1.4 gateways.py:1.3 storage.py:1.7

Shane Hathaway shane@zope.com
Wed, 30 Jul 2003 17:33:50 -0400


Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv5368/lib/apelib/zodb3

Modified Files:
	connection.py db.py gateways.py storage.py 
Added Files:
	scanner.py 
Log Message:
Merged ape-scan-branch, sneaking in interface updates and minor reformatting.

Ape now watches the filesystem for changes to objects that Zope has in its
cache.


=== Products/Ape/lib/apelib/zodb3/scanner.py 1.1 => 1.2 ===
--- /dev/null	Wed Jul 30 17:33:49 2003
+++ Products/Ape/lib/apelib/zodb3/scanner.py	Wed Jul 30 17:33:12 2003
@@ -0,0 +1,277 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Cache scanner.
+
+Keeps a cache up to date by scanning for changes.
+
+$Id$
+"""
+
+from thread import allocate_lock
+from time import time
+
+from BTrees.OOBTree import OOBTree, OOSet, difference
+from BTrees.IOBTree import IOBTree
+from zLOG import LOG, DEBUG
+
+# FUTURE_TIMEOUT defines how long to keep source information regarding
+# OIDs that might be used soon.
+FUTURE_TIMEOUT = 10 * 60
+
+
+class ScanControl:
+
+    def __init__(self, db=None, scan_interval=10):
+        self.db = db
+        self.next_conn_id = 1
+        self.conn_oids = IOBTree()   # IOBTree({ conn_id -> OOSet([oid]) } })
+        self.oids = OOSet()          # OOSet([oid])
+        self.scanner = Scanner()
+        self.lock = allocate_lock()
+        self.scan_interval = scan_interval
+        self.next_scan = time() + scan_interval
+
+
+    def newConnection(self):
+        self.lock.acquire()
+        try:
+            conn_id = self.next_conn_id
+            self.next_conn_id = conn_id + 1
+            return ConnectionScanControl(self, conn_id)
+        finally:
+            self.lock.release()
+
+
+    def setConnectionOIDs(self, conn_id, oids):
+        changed = 0
+        new_oids = OOSet()
+        self.lock.acquire()
+        try:
+            if oids:
+                self.conn_oids[conn_id] = OOSet(oids)
+            else:
+                if self.conn_oids.has_key(conn_id):
+                    del self.conn_oids[conn_id]
+            for set in self.conn_oids.values():
+                new_oids.update(set)
+            if self.oids != new_oids:
+                self.oids = new_oids
+                changed = 1
+        finally:
+            self.lock.release()
+        if changed:
+            self.scanner.setOIDs(new_oids)
+        self.mayScan()
+
+
+    def mayScan(self):
+        now = time()
+        if now >= self.next_scan:
+            self.next_scan = now + self.scan_interval
+            LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
+            inv = self.scanner.scan()
+            self.scanner.pruneFuture()
+            LOG('Ape', DEBUG,
+                'Finished scanning. %d objects changed.' % len(inv))
+            if inv:
+                d = {}
+                for oid in inv:
+                    d[oid] = 1
+                if self.db is not None:
+                    self.db.invalidate(d)
+                else:
+                    LOG('Ape', DEBUG, "No database set, so can't invalidate!")
+
+
+class ConnectionScanControl:
+
+    def __init__(self, ctl, conn_id):
+        self.ctl = ctl
+        self.conn_id = conn_id
+        self.next_update = 0
+
+    def ready(self):
+        now = time()
+        if now >= self.next_update:
+            self.next_update = now + self.ctl.scan_interval
+            return 1
+        return 0
+
+    def setOIDs(self, oids):
+        self.ctl.setConnectionOIDs(self.conn_id, oids)
+
+
+class Scanner:
+
+    def __init__(self):
+        self.current = OOBTree()  # OOBTree({ oid -> {source->state} })
+        self.future = {}          # { oid -> ({source->state}, atime) }
+        self.uncommitted = {}     # { tid -> {oid->{source->state}} }
+        self.lock = allocate_lock()
+        self.storage = None
+
+    def setStorage(self, s):
+        # This is needed for calling storage.getSources().
+        self.storage = s
+
+    def setOIDs(self, oids):
+        new_sources = {}  # { oid -> sourcedict }
+        self.lock.acquire()
+        try:
+            removed = difference(self.current, oids)
+            for oid in removed.keys():
+                del self.current[oid]
+            added = difference(oids, self.current)
+            for oid in added.keys():
+                if self.future.has_key(oid):
+                    # Source info for this OID was provided earlier.
+                    sources, atime = self.future[oid]
+                    del self.future[oid]
+                    self.current[oid] = sources
+                else:
+                    new_sources[oid] = None
+        finally:
+            self.lock.release()
+        if new_sources:
+            # Load source info the slow way.
+            if self.storage is not None:
+                LOG('Ape', DEBUG, 'Getting sources for %d oids.'
+                    % len(new_sources))
+                for oid in new_sources.keys():
+                    new_sources[oid] = self.storage.getSources(oid)
+            else:
+                LOG('Ape', DEBUG, "Can't get sources for %d oids. "
+                    "Assuming no sources!" % len(new_sources))
+                # This will cause the scanner to miss changes, but
+                # since no storage is known, there is little we can
+                # do.
+                for oid in new_sources.keys():
+                    new_sources[oid] = {}
+            self.lock.acquire()
+            try:
+                for oid, sources in new_sources.items():
+                    if not self.current.has_key(oid):
+                        self.current[oid] = sources
+                    # else something else added the source info
+                    # while self.lock was released.
+            finally:
+                self.lock.release()
+
+
+    def setSources(self, oid, sources):
+        if sources is None:
+            sources = {}
+        self.lock.acquire()
+        try:
+            if self.current.has_key(oid):
+                # This OID is known to be in use.
+                self.current[oid] = sources
+            else:
+                # This OID might be useful soon.
+                self.future[oid] = (sources, time())
+        finally:
+            self.lock.release()
+
+
+    def setUncommittedSources(self, tid, oid, sources):
+        self.lock.acquire()
+        try:
+            t = self.uncommitted.setdefault(tid, {})
+            t[oid] = sources
+        finally:
+            self.lock.release()
+
+
+    def scan(self):
+        to_scan = {}        # { repo -> { source -> state } }
+        to_invalidate = {}  # { oid -> 1 }
+        self.lock.acquire()  # lock because oid_states might be self.current.
+        try:
+            for oid, sources in self.current.items():
+                for source, state in sources.items():
+                    repo, location = source
+                    to_scan.setdefault(repo, {})[source] = state
+        finally:
+            self.lock.release()
+        changes = {}
+        for repo, d in to_scan.items():
+            c = repo.freshen(d)
+            if c:
+                changes.update(c)
+        if changes:
+            # Something changed.  Map the changes back to oids and
+            # update self.current.
+            self.lock.acquire()
+            try:
+                for oid, sources in self.current.items():
+                    for source, state in sources.items():
+                        if changes.has_key(source):
+                            to_invalidate[oid] = 1
+                            sources[source] = changes[source]
+            finally:
+                self.lock.release()
+        return to_invalidate.keys()
+
+
+    def pruneFuture(self):
+        if self.future:
+            self.lock.acquire()
+            try:
+                # OIDs older than some timeout will probably never be loaded.
+                cutoff = time() - FUTURE_TIMEOUT
+                for oid, (sources, atime) in self.future.items():
+                    if atime < cutoff:
+                        del self.future[oid]
+            finally:
+                self.lock.release()
+            LOG('Ape', DEBUG,
+                'Future sources cache size: %d objects.' % len(self.future))
+
+
+    def afterCommit(self, tid):
+        self.lock.acquire()
+        try:
+            if not self.uncommitted.has_key(tid):
+                return
+            t = self.uncommitted[tid]
+            del self.uncommitted[tid]
+        finally:
+            self.lock.release()
+        # Update the sources with new states for the committed OIDs.
+        to_scan = {}        # { repo -> { source -> state } }
+        for oid, sources in t.items():
+            for source, state in sources.items():
+                repo, location = source
+                to_scan.setdefault(repo, {})[source] = state
+        changes = {}
+        for repo, d in to_scan.items():
+            c = repo.freshen(d)
+            if c:
+                changes.update(c)
+        for oid, sources in t.items():
+            new_sources = {}
+            for source, state in sources.items():
+                state = changes.get(source, state)
+                new_sources[source] = state
+            self.setSources(oid, new_sources)
+
+
+    def afterAbort(self, tid):
+        self.lock.acquire()
+        try:
+            if self.uncommitted.has_key(tid):
+                del self.uncommitted[tid]
+        finally:
+            self.lock.release()
+


=== Products/Ape/lib/apelib/zodb3/connection.py 1.5 => 1.6 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.5	Mon May 26 16:20:09 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py	Wed Jul 30 17:33:12 2003
@@ -47,10 +47,25 @@
     tabular records.
     """
     _osio = None
+    _scan_ctl = None
 
     __implements__ = (IKeyedObjectSystem,
                       getattr(Connection, '__implements__', ()))
 
+
+    def _setDB(self, odb):
+        Connection._setDB(self, odb)
+        if odb._scan_ctl is not None:
+            ctl = self._scan_ctl
+            if ctl is None:
+                self._scan_ctl = ctl = odb._scan_ctl.newConnection()
+            if ctl.ready():
+                ctl.setOIDs(self._cache.cache_data.keys())
+                # If there were any invalidations, process them now.
+                if self._invalidated:
+                    self._flush_invalidations()
+
+
     def getObjectSystemIO(self):
         osio = self._osio
         if osio is None:
@@ -488,7 +503,7 @@
             if obj is None:
                 return
             if serial == ResolvedSerial:
-                obj._p_changed = None
+                del obj._p_changed
             else:
                 if change:
                     obj._p_changed = 0
@@ -502,7 +517,7 @@
                 if obj is None:
                     continue
                 if serial == ResolvedSerial:
-                    obj._p_changed = None
+                    del obj._p_changed
                 else:
                     if change:
                         obj._p_changed = 0


=== Products/Ape/lib/apelib/zodb3/db.py 1.3 => 1.4 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.3	Wed Jun  4 11:44:45 2003
+++ Products/Ape/lib/apelib/zodb3/db.py	Wed Jul 30 17:33:12 2003
@@ -20,6 +20,7 @@
 
 from apelib.core.interfaces import IMapper
 from apelib.core.exceptions import ConfigurationError
+
 from connection import ApeConnection
 from storage import ApeStorage
 from oidencoder import OIDEncoder
@@ -47,11 +48,12 @@
 
     klass = ApeConnection
 
-    # SDH: two extra args.
+    # SDH: some extra args.
     def __init__(self, storage,
                  mapper_resource=None,
                  factory=None,
                  oid_encoder=None,
+                 scan_interval=10,
                  pool_size=7,
                  cache_size=400,
                  cache_deactivate_after=60,
@@ -111,6 +113,14 @@
             assert IOIDEncoder.isImplementedBy(oid_encoder)
         self._oid_encoder = oid_encoder
         self._mapper_resource = mapper_resource
+        if scan_interval > 0:
+            from scanner import ScanControl
+            ctl = ScanControl(db=self, scan_interval=scan_interval)
+            self._scan_ctl = ctl
+            ctl.scanner.setStorage(storage)
+            storage.setScanner(ctl.scanner)
+        else:
+            self._scan_ctl = None
 
         # Pass through methods:
         for m in ('history',


=== Products/Ape/lib/apelib/zodb3/gateways.py 1.2 => 1.3 ===
--- Products/Ape/lib/apelib/zodb3/gateways.py:1.2	Wed Jul  9 11:40:12 2003
+++ Products/Ape/lib/apelib/zodb3/gateways.py	Wed Jul 30 17:33:12 2003
@@ -56,3 +56,6 @@
                 % (repr(data), repr(expect)))
         return None
 
+    def getSources(self, event):
+        return None
+


=== Products/Ape/lib/apelib/zodb3/storage.py 1.6 => 1.7 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.6	Wed Jun  4 11:45:21 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py	Wed Jul 30 17:33:12 2003
@@ -60,8 +60,12 @@
         if not name:
             name = 'ApeStorage: ' + ', '.join(names)
         self._ltid = None
+        self._scanner = None
         BaseStorage.BaseStorage.__init__(self, name)
 
+    def setScanner(self, s):
+        self._scanner = s
+
     def __len__(self):
         return 1
 
@@ -96,7 +100,7 @@
         try:
             self._mapper_resource.access(self)  # Update mapper
             keychain = self._oid_encoder.decode(oid)
-            classified_state, hash_value = self._gwio.load(keychain)
+            event, classified_state, hash_value = self._gwio.load(keychain)
             file = StringIO()
             p = Pickler(file)
             p.dump(classified_state)
@@ -104,6 +108,10 @@
             h = self.hash64(hash_value)
             if DEBUG:
                 print 'loaded', `oid`, `h`
+            if self._scanner is not None:
+                gw = event.getMapper().getGateway()
+                sources = gw.getSources(event)
+                self._scanner.setSources(oid, sources)
             return data, h
         finally:
             self._lock_release()
@@ -128,7 +136,7 @@
             if h64 != HASH0:
                 # Overwriting an old object.  Use the hash to verify
                 # that the new data was derived from the old data.
-                old_cs, old_hash = self._gwio.load(keychain, 1)
+                event, old_cs, old_hash = self._gwio.load(keychain)
                 old_h64 = self.hash64(old_hash)
                 if h64 != old_h64:
                     raise POSException.ConflictError(
@@ -140,7 +148,7 @@
                 # NoStateFoundError or a hash of None, otherwise
                 # there's a conflict.
                 try:
-                    cs, old_hash = self._gwio.load(keychain, 1)
+                    event, cs, old_hash = self._gwio.load(keychain)
                 except NoStateFoundError:
                     pass
                 else:
@@ -152,8 +160,12 @@
             file = StringIO(data)
             u = Unpickler(file)
             classified_state = u.load()
-            new_hash = self._gwio.store(keychain, classified_state)
+            event, new_hash = self._gwio.store(keychain, classified_state)
             new_h64 = self.hash64(new_hash)
+            if self._scanner is not None:
+                gw = event.getMapper().getGateway()
+                sources = gw.getSources(event)
+                self._scanner.setUncommittedSources(self._serial, oid, sources)
         finally:
             self._lock_release()
 
@@ -161,6 +173,14 @@
             print 'stored', `oid`, `h64`, `new_h64`
         return new_h64
 
+    def getSources(self, oid):
+        keychain = self._oid_encoder.decode(oid)
+        self._lock_acquire()
+        try:
+            return self._gwio.getSources(keychain)
+        finally:
+            self._lock_release()
+
     def new_oid(self):
         keychain = self._gwio.newKeychain()
         return self._oid_encoder.encode(keychain)
@@ -174,6 +194,8 @@
     def _abort(self):
         for c in self._conn_list:
             c.abort()
+        if self._scanner is not None:
+            self._scanner.afterAbort(self._serial)
 
     def _begin(self, tid, u, d, e):
         for c in self._conn_list:
@@ -183,6 +205,8 @@
         for c in self._conn_list:
             c.finish()
         self._ltid = self._serial
+        if self._scanner is not None:
+            self._scanner.afterCommit(self._serial)
 
     def _vote(self):
         for c in self._conn_list: