[Zodb-checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.110.2.3 cache.py:1.1.2.3

Jeremy Hylton cvs-admin at zope.org
Mon Nov 10 17:43:25 EST 2003


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv9752/ZEO

Modified Files:
      Tag: ZODB3-mvcc-2-branch
	ClientStorage.py cache.py 
Log Message:
Revise ClientCache implementation and integrate with one of Tim's in-memory caches.

The in-memory cache isn't the expected long-term solution, but it shows how to integrate a lower-level storage scheme with the necessary application-level interface.

Add some simple tests of eviction, which the previous cache did not implement.

Add code one the way towards serialization.


=== ZODB3/ZEO/ClientStorage.py 1.110.2.2 => 1.110.2.3 ===
--- ZODB3/ZEO/ClientStorage.py:1.110.2.2	Wed Nov  5 23:41:54 2003
+++ ZODB3/ZEO/ClientStorage.py	Mon Nov 10 17:42:54 2003
@@ -27,7 +27,7 @@
 import types
 
 from ZEO import ServerStub
-from ZEO.cache import Cache
+from ZEO.cache import ClientCache
 from ZEO.TransactionBuffer import TransactionBuffer
 from ZEO.Exceptions import ClientStorageError, UnrecognizedResult, \
      ClientDisconnected, AuthError
@@ -92,7 +92,7 @@
     # Classes we instantiate.  A subclass might override.
 
     TransactionBufferClass = TransactionBuffer
-    ClientCacheClass = Cache
+    ClientCacheClass = ClientCache
     ConnectionManagerClass = ConnectionManager
     StorageServerStubClass = ServerStub.StorageServer
 


=== ZODB3/ZEO/cache.py 1.1.2.2 => 1.1.2.3 ===
--- ZODB3/ZEO/cache.py:1.1.2.2	Wed Nov  5 23:35:21 2003
+++ ZODB3/ZEO/cache.py	Mon Nov 10 17:42:54 2003
@@ -13,6 +13,11 @@
 ##############################################################################
 """Example client cache that stores multiple revisions of an object."""
 
+import bisect
+import struct
+
+from sets import Set
+
 ##
 # A disk-based cache for ZEO clients.
 # <p>
@@ -39,24 +44,88 @@
 # invalidations every time an object is modified.  Whe the client is
 # disconnected, it must perform cache verification to make sure its
 # cached data is synchronized with the storage's current state.
-
+# <p>
 # quick verification
 # full verification
+# <p>
+# XXX serialization
+# cache is normally an in-memory data structure?
+#    pro: faster, simpler
+#    cons: might use too much memory on small machine
+#          perhaps, just use smaller cache
+# periodically write snapshot of dll to disk
+# as invalidations come in, write them to a log
+# on close, write new snapshot
+# XXX unlink old snapshot first?
 
-class Cache:
+class ClientCache:
     """A simple in-memory cache."""
 
-    def __init__(self):
+    ##
+    # Do we put the constructor here?
+    # @param path path of persistent snapshot of cache state
+    # @param size maximum size of object data, in bytes
+
+    def __init__(self, path=None, size=None):
+        self.path = path
+        self.size = size
+        
+        # Last transaction seen by the cache, either via setLastTid()
+        # or by invalidate().
         self.tid = None
+
+        # The cache stores objects in a dict mapping (oid, tid) pairs
+        # to Object() records (see below).  The tid is the transaction
+        # id that wrote the object.  An object record includes data,
+        # serialno, and end tid.  It has auxillary data structures to
+        # compute the appropriate tid, given the oid and a transaction id
+        # representing an arbitrary point in history.
+        #
+        # The serialized form of the cache just stores the Object()
+        # records.  The in-memory form can be reconstructed from these
+        # records.
+        
+        # Maps oid to current tid.  Used to find compute key for objects.
         self.current = {}
-        self.current_tid = {}
-        self.version = {}
+        # Maps oid to list of (start_tid, end_tid) pairs in sorted order.
+        # Used to find matching key for load of non-current data.
         self.noncurrent = {}
+        # XXX I think oid, version can map to oid, tid without confusion.
+        # A transaction can write version data or non-version data,
+        # but not both.
+        self.version = {}
 
-    # XXX perhaps we need an open, too
+        # A double-linked list is used to manage the cache.  It makes
+        # decisions about which objects to keep and which to evict.
+        self.dll = Cache(size or 10**6)
+
+    def open(self):
+        self._read()
+
+    def _read(self):
+        f = open(self.path, "rb")
+        while 1:
+            o = Object.fromFile(f, self)
+            if o is None:
+                break
+            if o.version:
+                self.version[o.oid] = o.start_tid
+            elif o.end_tid is None:
+                self.current[o.oid] = o.start_tid
+            else:
+                # XXX recreate the internal cache data structures
+                print "non-current cache data"
 
     def close(self):
-        pass
+        self._write()
+
+    def _write(self):
+        # XXX The cache needs a minimal header.
+        # magic cookie, format version no, configured size of cache
+        f = open(self.path, "wb")
+        for o in self.dll:
+            o.write(f)
+        f.close()
 
     ##
     # Set the last transaction seen by the cache.
@@ -79,20 +148,19 @@
         return self.tid
 
     ##
-    # Return the current data record for oid and version
+    # Return the current data record for oid and version.
     # @param oid object id
     # @param version a version string
-    # @return data record and serial number
+    # @return data record and serial number or None if the object is not
+    #         in the cache
     # @defreturn 2-tuple: (string, string)
 
     def load(self, oid, version=""):
-        if version:
-            t = self.version.get(oid)
-            if t is not None:
-                stored_version, data, serial, tid = t
-                if version == stored_version:
-                    return data, serial
-        return self.current.get(oid)
+        tid = version and self.version.get(oid) or self.current.get(oid)
+        if tid is None:
+            return None
+        o = self.dll.access((oid, tid))
+        return o.data, o.serialno
 
     ##
     # Return a non-current revision of oid that was current before tid.
@@ -102,10 +170,22 @@
     # @defreturn 4-tuple: (string, string, string, string)
 
     def loadNonCurrent(self, oid, tid):
-        for lo, hi, data, serial in self.noncurrent.get(oid, []):
-            if lo < tid <= hi:
-                return data, serial, lo, hi
-        return None
+        L = self.noncurrent.get(oid)
+        if L is None:
+            return None
+        # A pair with None as the second element will always be less
+        # than any pair with the same first tid.
+        i = bisect.bisect_left(L, (tid, None))
+        # The least element left of tid was written before tid.  If
+        # there is no element, the cache doesn't have old enough data.
+        if i == 0:
+            return
+        lo, hi = L[i-1]
+        # XXX lo should always be less than tid
+        if not lo < tid <= hi:
+            return None
+        o = self.dll.access((oid, lo))
+        return o.data, o.serialno, o.start_tid, o.end_tid
 
     ##
     # Return the version an object is modified in or None for an
@@ -115,10 +195,11 @@
     # @defreturn string or None
 
     def modifiedInVersion(self, oid):
-        t = self.version.get(oid)
-        if t is None:
+        tid = self.version.get(oid)
+        if tid is None:
             return None
-        return t[0]
+        o = self.dll.access((oid, tid))
+        return o.version
 
     ##
     # Store a new data record in the cache.
@@ -134,21 +215,21 @@
     # @exception ValueError tried to store non-current version data
 
     def store(self, oid, version, serial, start_tid, end_tid, data):
+        o = Object((oid, start_tid), version, serial, data, start_tid, end_tid,
+                   self)
         if version:
             if end_tid is not None:
                 raise ValueError("cache only stores current version data")
-            self.version[oid] = version, data, serial, start_tid
-            return
-        # XXX If there was a previous revision, we need to invalidate it.
-        if end_tid is None:
-            if oid in self.current:
-                raise ValueError("already have current data for oid")
-            self.current[oid] = data, serial
-            self.current_tid[oid] = start_tid
+            self.version[oid] = start_tid
         else:
-            # XXX could use bisect and keep the list sorted
-            L = self.noncurrent.setdefault(oid, [])
-            L.append((start_tid, end_tid, data, serial))
+            if end_tid is None:
+                if oid in self.current:
+                    raise ValueError("already have current data for oid")
+                self.current[oid] = start_tid
+            else:
+                L = self.noncurrent.setdefault(oid, [])
+                bisect.insort_left(L, (start_tid, end_tid))
+        self.dll.add(o)
 
     ##
     # Mark the current data for oid as non-current.  If there is no
@@ -163,13 +244,15 @@
         if version:
             if oid in self.version:
                 del self.version[oid]
-        data = self.current.get(oid)
-        if data is None:
             return
-        del self.current[oid]
-        start_tid = self.current_tid.pop(oid)
+        if oid not in self.current:
+            return
+        cur_tid = self.current.pop(oid)
+        # XXX Want to fetch object without marking it as accessed
+        o = self.dll.access((oid, cur_tid))
+        o.end_tid = tid
         L = self.noncurrent.setdefault(oid, [])
-        L.append((start_tid, tid, data[0], data[1]))
+        bisect.insort_left(L, (cur_tid, tid))
 
     ##
     # An iterator yielding the current contents of the cache.
@@ -177,7 +260,300 @@
     # @return oid, version, serial number triples
 
     def contents(self):
+        # XXX
         for oid, (data, serial) in self.current.items():
             yield oid, "", serial
         for oid, (version, data, serial, start_tid) in self.version.items():
             yield oid, version, serial
+
+    ##
+    # Return the number of object revisions in the cache.
+
+    def __len__(self):
+        n = len(self.current) + len(self.version)
+        if self.noncurrent:
+            n += reduce(int.__add__, map(len, self.noncurrent))
+        return n
+
+    def _evicted(self, o):
+        # Called by Object o to signal its eviction
+        oid, tid = o.key
+        if o.end_tid is None:
+            if o.version:
+                del self.version[oid]
+            else:
+                del self.current[oid]
+        else:
+            # XXX haven't handled non-current
+            L = self.noncurrent[oid]
+            i = bisect.bisect_left((o.start_end, o.end_tid))
+            print L, i
+
+##
+# An object that's part of a headed, circular, doubly-linked list.
+# Because it's doubly linked, an object can be removed from the list
+# in constant time.
+# <p>
+# The cache keeps such a list of all Objects (see later), and uses a
+# travelling pointer to decay the worth of objects over time.
+
+class DLLNode(object):
+    # previous and next objects in circular list
+    __slots__ = '_prev', '_next'
+
+    def __init__(self):
+        self._prev = self._next = None
+
+    # Insert self immediately before other in list.
+    def insert_before(self, other):
+        prev = other._prev
+        self._prev = prev
+        self._next = other
+        prev._next = other._prev = self
+
+    # Insert self immediately after other in list.
+    def insert_after(self, other):
+        self.insert_before(other._next)
+
+    # Remove self from the list.
+    def unlink(self):
+        prev, next = self._prev, self._next
+        prev._next = next
+        next._prev = prev
+        self._prev = self._next = None
+##
+# The head of a doubly-linked list.
+
+class DLLHead(DLLNode):
+    def __init__(self):
+        self._prev = self._next = self
+
+    # In Boolean context, a DLLHead is true iff the list isn't empty.
+    def __nonzero__(self):
+        return self._next is not self
+
+##
+# A node for a data object stored in the cache.
+
+# XXX Objects probably keep a pointer to the ClientCache so they
+# can remove themselves for auxilliary data structures.
+
+class Object(DLLNode):
+    __slots__ = (# object id, txn id -- something usable as a dict key
+                 "key",
+
+                 # memory size -- an integer
+                 "msize",
+
+                 # one-byte int giving the usefulness of the object --
+                 # the larger the value, the more reluctant we are
+                 # to evict the object
+                 "worth",
+
+                 "start_tid", # string, id of txn that wrote the data
+                 "end_tid", # string, id of txn that wrote next revision
+                            # or None
+                 "version", # string, name of version
+                 "serialno", # string, serial number assigned by txn
+                 "data", # string, the actual data record for the object
+
+                 "cache", # ClientCache instance containing Object
+                )
+
+    def __init__(self, key, version, serialno, data, start_tid, end_tid,
+                 cache):
+        self.key = key
+        self.msize = len(data)
+        self.worth = None
+        self.version = version
+        self.serialno = serialno
+        self.data = data
+        self.start_tid = start_tid
+        self.end_tid = end_tid
+        self.cache = cache
+
+    def unlink(self):
+        DLLNode.unlink(self)
+        self.cache._evicted(self)
+
+    def serialize(self, f):
+        # Write standard form of Object to file, f.
+        s = struct.pack(">QQQQhi", self.key[0], self.serialno,
+                        self.start_tid, self.end_tid, len(self.version),
+                        len(self.data))
+        f.write(s)
+        f.write(self.version)
+        f.write(self.data)
+        f.write(struct.pack(">Q", s))
+
+    def fromFile(cls, f, cache):
+        fmt = ">QQQQhi"
+        s = f.read(struct.calcsize(fmt))
+        if not s:
+            return None
+        oid, serialno, start_tid, end_tid, vlen, dlen = struct.unpack(fmt, s)
+        version = f.read(vlen)
+        if vlen != len(version):
+            raise ValueError("corrupted record, version")
+        data = f.read(dlen)
+        if dlen != len(data):
+            raise ValueError("corrupted record, data")
+        s = f.read(8)
+        if struct.pack(">Q", s) != oid:
+            raise ValueError("corrupted record, oid")
+        return cls((oid, start_tid), version, serialno, data,
+                   start_tid, end_tid, cache)
+
+    fromFile = classmethod(fromFile)
+
+# Serialized format is:
+#     8-byte oid
+#     8-byte serialno
+#     8-byte start_tid
+#     8-byte end_end
+#     2-byte version length
+#     4-byte data length
+#     version
+#     data
+#     8-byte oid
+# struct format is >QQQQhi
+        
+##
+# XXX
+
+class Cache(object):
+    def __init__(self, maxsize):
+        # Maximum total of object sizes we keep in cache.
+        self.maxsize = maxsize
+        # Current total of object sizes in cache.
+        self.currentsize = 0
+
+        # A worth byte maps to a set of all Objects with that worth.
+        # This is cheap to keep updated, and makes finding low-worth
+        # objects for eviction trivial (just march over the worthsets
+        # list, in order).
+        self.worthsets = [Set() for dummy in range(256)]
+
+        # We keep a circular list of all objects in cache.  currentobj
+        # walks around it forever.  Each time _tick() is called, the
+        # worth of currentobj is decreased, basically by shifting
+        # right 1, and currentobj moves on to the next object.  When
+        # an object is first inserted, it enters the list right before
+        # currentobj.  When an object is accessed, its worth is
+        # increased by or'ing in 0x80.  This scheme comes from the
+        # Thor system, and is an inexpensive way to account for both
+        # recency and frequency of access:  recency is reflected in
+        # the leftmost bit set, and frequency by how many bits are
+        # set.
+        #
+        # Note:  because evictions are interleaved with ticks,
+        # unlinking an object is tricky, lest we evict currentobj.  The
+        # class _unlink method takes care of this properly.
+        self.listhead = DLLHead()
+        self.currentobj = self.listhead
+
+        # Map an object.key to its Object.
+        self.key2object = {}
+
+        # Statistics:  _n_adds, _n_added_bytes,
+        #              _n_evicts, _n_evicted_bytes
+        #              _n_accesses
+        self.clearStats()
+
+    def clearStats(self):
+        self._n_adds = self._n_added_bytes = 0
+        self._n_evicts = self._n_evicted_bytes = 0
+        self._n_accesses = 0
+
+    def getStats(self):
+        return (self._n_adds, self._n_added_bytes,
+                self._n_evicts, self._n_evicted_bytes,
+                self._n_accesses,
+               )
+
+    # Support iteration over objects in the cache without
+    # marking the objects as accessed.
+
+    def __iter__(self):
+        return self.key2object.itervalues()
+
+    # Unlink object from the circular list, taking care not to lose
+    # track of the current object.  Always call this instead of
+    # invoking obj.unlink() directly.
+    def _unlink(self, obj):
+        assert obj is not self.listhead
+        if obj is self.currentobj:
+            self.currentobj = obj._next
+        obj.unlink()
+
+    # Change obj.worth to newworth, maintaining invariants.
+    def _change_worth(self, obj, newworth):
+        if obj.worth != newworth:
+            self.worthsets[obj.worth].remove(obj)
+            obj.worth = newworth
+            self.worthsets[newworth].add(obj)
+
+    def add(self, object):
+        self._n_adds += 1
+        self._n_added_bytes += object.msize
+
+        assert object.key not in self.key2object
+        self.key2object[object.key] = object
+
+        newsize = self.currentsize + object.msize
+        if newsize > self.maxsize:
+            self._evictbytes(newsize - self.maxsize)
+            newsize = self.currentsize + object.msize
+        self.currentsize = newsize
+        object.insert_before(self.currentobj)
+
+        # Give new object an intial worth roughly equal to the log
+        # (base 2) of its size.  The intuition is that larger objects
+        # are more expensive to fetch over the network, so are worth
+        # more (at least at first).
+        worth = 0
+        targetsize = 1
+        while object.msize > targetsize:
+            worth += 1
+            targetsize <<= 1
+        object.worth = worth
+        self.worthsets[worth].add(object)
+
+    # Decrease the worth of the current object, and advance the
+    # current object.
+    def _tick(self):
+        c = self.currentobj
+        if c is self.listhead:
+            c = c._next
+            if c is self.listhead:  # list is empty
+                return
+        self._change_worth(c, (c.worth + 1) >> 1)
+        self.currentobj = c._next
+
+    def access(self, oid):
+        self._n_accesses += 1
+        self._tick()
+        obj = self.key2object.get(oid)
+        if obj is None:
+            return False # XXX None?
+        self._change_worth(obj, obj.worth | 0x80)
+        return obj
+        
+    # Evict objects of least worth first, until at least nbytes bytes
+    # have been freed.
+    def _evictbytes(self, nbytes):
+        for s in self.worthsets:
+            while s:
+                if nbytes <= 0:
+                    return
+                obj = s.pop()
+                nbytes -= obj.msize
+                self._evictobj(obj)
+            
+    def _evictobj(self, obj):
+        self._n_evicts += 1
+        self._n_evicted_bytes += obj.msize
+        self.currentsize -= obj.msize
+        self.worthsets[obj.worth].discard(obj)
+        del self.key2object[obj.key]
+        self._unlink(obj)




More information about the Zodb-checkins mailing list