[Zope3-checkins] CVS: ZODB4/src/zodb/zeo - cache.py:1.4.24.1

Jeremy Hylton jeremy@zope.com
Wed, 18 Jun 2003 16:54:40 -0400


Update of /cvs-repository/ZODB4/src/zodb/zeo
In directory cvs.zope.org:/tmp/cvs-serv28292

Modified Files:
      Tag: ZODB3-2-merge
	cache.py 
Log Message:
Backport the new zeo cache code with some small improvements.


=== ZODB4/src/zodb/zeo/cache.py 1.4 => 1.4.24.1 ===
--- ZODB4/src/zodb/zeo/cache.py:1.4	Thu Mar 13 16:32:30 2003
+++ ZODB4/src/zodb/zeo/cache.py	Wed Jun 18 16:54:39 2003
@@ -13,10 +13,11 @@
 ##############################################################################
 
 # XXX TO DO
-# use two indices rather than the sign bit of the index??????
-# add a shared routine to read + verify a record???
-# redesign header to include vdlen???
-# rewrite the cache using a different algorithm???
+# Add a shared routine to read + verify a record.  Have that routine
+#   return a record object rather than a string.
+# Use two indices rather than the sign bit of the index??????
+# Redesign header to include vdlen???
+# Rewrite the cache using a different algorithm???
 
 """Implement a client cache
 
@@ -44,7 +45,9 @@
 
   offset in record: name -- description
 
-  0: oid -- 8-byte object id
+  0: oidlen -- 2-byte unsigned object id length
+
+  2: reserved (6 bytes)
 
   8: status -- 1-byte status 'v': valid, 'n': non-version valid, 'i': invalid
                ('n' means only the non-version data in the record is valid)
@@ -57,23 +60,25 @@
 
   19: serial -- 8-byte non-version serial (timestamp)
 
-  27: data -- non-version data
+  27: oid -- object id
+
+  27+oidlen: data -- non-version data
 
-  27+dlen: version -- Version string (if vlen > 0)
+  27+oidlen+dlen: version -- Version string (if vlen > 0)
 
-  27+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
+  27+oidlen+dlen+vlen: vdlen -- 4-byte length of version data (if vlen > 0)
 
-  31+dlen+vlen: vdata -- version data (if vlen > 0)
+  31+oidlen+dlen+vlen: vdata -- version data (if vlen > 0)
 
-  31+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
+  31+oidlen+dlen+vlen+vdlen: vserial -- 8-byte version serial (timestamp)
                                  (if vlen > 0)
 
-  27+dlen (if vlen == 0) **or**
-  39+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
-                              redundancy and backward traversal)
+  27+oidlen+dlen (if vlen == 0) **or**
+  39+oidlen+dlen+vlen+vdlen: tlen -- 4-byte (unsigned) record length (for
+                                     redundancy and backward traversal)
 
-  31+dlen (if vlen == 0) **or**
-  43+dlen+vlen+vdlen: -- total record length (equal to tlen)
+  31+oidlen+dlen (if vlen == 0) **or**
+  43+oidlen+dlen+vlen+vdlen: -- total record length (equal to tlen)
 
 There is a cache size limit.
 
@@ -105,7 +110,6 @@
 file 0 and file 1.
 """
 
-import logging
 import os
 import time
 import logging
@@ -114,9 +118,9 @@
 from thread import allocate_lock
 
 from zodb.utils import u64
-from zodb.interfaces import ZERO
+from zodb.interfaces import ZERO, _fmt_oid
 
-magic = 'ZEC1'
+magic = 'ZEC2'
 headersize = 12
 
 MB = 1024**2
@@ -158,15 +162,13 @@
                 if os.path.exists(p[i]):
                     fi = open(p[i],'r+b')
                     if fi.read(4) == magic: # Minimal sanity
-                        fi.seek(0, 2)
-                        if fi.tell() > headersize:
-                            # Read serial at offset 19 of first record
-                            fi.seek(headersize + 19)
-                            s[i] = fi.read(8)
+                        # Read the ltid for this file.  If it never
+                        # saw a transaction commit, it will get tossed,
+                        # even if it has valid data.
+                        s[i] = fi.read(8)
                     # If we found a non-zero serial, then use the file
                     if s[i] != ZERO:
                         f[i] = fi
-                    fi = None
 
             # Whoever has the larger serial is the current
             if s[1] > s[0]:
@@ -186,11 +188,16 @@
             self._p = p = [None, None]
             f[0].write(magic + '\0' * (headersize - len(magic)))
             current = 0
+        self._current = current
 
-        self.log("%s: storage=%r, size=%r; file[%r]=%r",
-                 self.__class__.__name__, storage, size, current, p[current])
+        if self._ltid:
+            ts = "; last txn=%x" % u64(self._ltid)
+        else:
+            ts = ""
+        self.log("%s: storage=%r, size=%r; file[%r]=%r%s" %
+                 (self.__class__.__name__, storage, size, current, p[current],
+                  ts))
 
-        self._current = current
         self._setup_trace()
 
     def open(self):
@@ -224,6 +231,18 @@
                 except OSError:
                     pass
 
+    def _read_header(self, f, pos):
+        # Read record header from f at pos, returning header and oid.
+        f.seek(pos)
+        h = f.read(27)
+        if len(h) != 27:
+            self.log("_read_header: short record at %s in %s", pos, f.name)
+            return None, None
+        oidlen = unpack(">H", h[:2])[0]
+        oid = f.read(oidlen)
+        return h, oid
+        
+
     def getLastTid(self):
         """Get the last transaction id stored by setLastTid().
 
@@ -243,7 +262,7 @@
         f = self._f[self._current]
         f.seek(4)
         tid = f.read(8)
-        if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+        if len(tid) < 8 or tid == ZERO:
             return None
         else:
             return tid
@@ -255,7 +274,7 @@
         cache file; otherwise it's an instance variable.
         """
         if self._client is None:
-            if tid == '\0\0\0\0\0\0\0\0':
+            if tid == ZERO:
                 tid = None
             self._ltid = tid
         else:
@@ -267,7 +286,7 @@
 
     def _setLastTid(self, tid):
         if tid is None:
-            tid = '\0\0\0\0\0\0\0\0'
+            tid = ZERO
         else:
             tid = str(tid)
             assert len(tid) == 8
@@ -292,18 +311,14 @@
                 return None
             f = self._f[p < 0]
             ap = abs(p)
-            f.seek(ap)
-            h = f.read(27)
-            if len(h) != 27:
-                self.log("invalidate: short record for oid %16x "
-                         "at position %d in cache file %d",
-                         U64(oid), ap, p < 0)
+            h, rec_oid = self._read_header(f, ap)
+            if h is None:
                 del self._index[oid]
                 return None
-            if h[:8] != oid:
-                self.log("invalidate: oid mismatch: expected %16x read %16x "
+            if rec_oid != oid:
+                self.log("invalidate: oid mismatch: expected %s read %s "
                          "at position %d in cache file %d",
-                         U64(oid), U64(h[:8]), ap, p < 0)
+                         _fmt_oid(oid), _fmt_oid(rec_oid), ap, p < 0)
                 del self._index[oid]
                 return None
             f.seek(ap+8) # Switch from reading to writing
@@ -329,16 +344,18 @@
             ap = abs(p)
             seek = f.seek
             read = f.read
-            seek(ap)
-            h = read(27)
-            if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+            h, rec_oid = self._read_header(f, ap)
+            if h is None:
+                del self._index[oid]
+                return None
+            if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
                 tlen, vlen, dlen = unpack(">iHi", h[9:19])
             else:
                 tlen = -1
             if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
-                self.log("load: bad record for oid %16x "
+                self.log("load: bad record for oid %s "
                          "at position %d in cache file %d",
-                         U64(oid), ap, p < 0)
+                         _fmt_oid(oid), ap, p < 0)
                 del self._index[oid]
                 return None
 
@@ -357,7 +374,7 @@
                     data = read(dlen)
                     self._trace(0x2A, oid, version, h[19:], dlen)
                     if (p < 0) != self._current:
-                        self._copytocurrent(ap, tlen, dlen, vlen, h, data)
+                        self._copytocurrent(ap, tlen, dlen, vlen, h, oid, data)
                     return data, h[19:]
                 else:
                     self._trace(0x26, oid, version)
@@ -369,12 +386,12 @@
             v = vheader[:-4]
             if version != v:
                 if dlen:
-                    seek(ap+27)
+                    seek(ap+27+len(oid))
                     data = read(dlen)
                     self._trace(0x2C, oid, version, h[19:], dlen)
                     if (p < 0) != self._current:
                         self._copytocurrent(ap, tlen, dlen, vlen, h,
-                                            data, vheader)
+                                            oid, data, vheader)
                     return data, h[19:]
                 else:
                     self._trace(0x28, oid, version)
@@ -386,12 +403,12 @@
             self._trace(0x2E, oid, version, vserial, vdlen)
             if (p < 0) != self._current:
                 self._copytocurrent(ap, tlen, dlen, vlen, h,
-                                    None, vheader, vdata, vserial)
+                                    oid, None, vheader, vdata, vserial)
             return vdata, vserial
         finally:
             self._release()
 
-    def _copytocurrent(self, pos, tlen, dlen, vlen, header,
+    def _copytocurrent(self, pos, tlen, dlen, vlen, header, oid,
                        data=None, vheader=None, vdata=None, vserial=None):
         """Copy a cache hit from the non-current file to the current file.
 
@@ -402,29 +419,31 @@
         if self._pos + tlen > self._limit:
             return # Don't let this cause a cache flip
         assert len(header) == 27
+        oidlen = len(oid)
         if header[8] == 'n':
             # Rewrite the header to drop the version data.
             # This shortens the record.
-            tlen = 31 + dlen
+            tlen = 31 + oidlen + dlen
             vlen = 0
-            # (oid:8, status:1, tlen:4, vlen:2, dlen:4, serial:8)
+            # (oidlen:2, reserved:6, status:1, tlen:4,
+            #  vlen:2, dlen:4, serial:8)
             header = header[:9] + pack(">IHI", tlen, vlen, dlen) + header[-8:]
         else:
             assert header[8] == 'v'
         f = self._f[not self._current]
         if data is None:
-            f.seek(pos+27)
+            f.seek(pos + 27 + len(oid))
             data = f.read(dlen)
             if len(data) != dlen:
                 return
-        l = [header, data]
+        l = [header, oid, data]
         if vlen:
             assert vheader is not None
             l.append(vheader)
             assert (vdata is None) == (vserial is None)
             if vdata is None:
                 vdlen = unpack(">I", vheader[-4:])[0]
-                f.seek(pos+27+dlen+vlen+4)
+                f.seek(pos + 27 + len(oid) + dlen + vlen + 4)
                 vdata = f.read(vdlen)
                 if len(vdata) != vdlen:
                     return
@@ -440,13 +459,12 @@
         g.seek(self._pos)
         g.writelines(l)
         assert g.tell() == self._pos + tlen
-        oid = header[:8]
         if self._current:
             self._index[oid] = - self._pos
         else:
             self._index[oid] = self._pos
         self._pos += tlen
-        self._trace(0x6A, header[:8], vlen and vheader[:-4] or '',
+        self._trace(0x6A, oid, vlen and vheader[:-4] or '',
                     vlen and vserial or header[-8:], dlen)
 
     def update(self, oid, serial, version, data, refs):
@@ -462,9 +480,11 @@
                 ap = abs(p)
                 seek = f.seek
                 read = f.read
-                seek(ap)
-                h = read(27)
-                if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+                h, rec_oid = self._read_header(f, ap)
+                if h is None:
+                    del self._index[oid]
+                    return None
+                if len(h)==27 and h[8] in 'nv' and rec_oid == oid:
                     tlen, vlen, dlen = unpack(">iHi", h[9:19])
                 else:
                     return self._store(oid, '', '', version, data, serial)
@@ -500,16 +520,19 @@
             ap = abs(p)
             seek = f.seek
             read = f.read
-            seek(ap)
-            h = read(27)
-            if len(h)==27 and h[8] in 'nv' and h[:8]==oid:
+            h, rec_oid = self._read_header(f, ap)
+            if h is None:
+                del self._index[oid]
+                return None
+                
+            if len(h) == 27 and h[8] in 'nv' and rec_oid == oid:
                 tlen, vlen, dlen = unpack(">iHi", h[9:19])
             else:
                 tlen = -1
             if tlen <= 0 or vlen < 0 or dlen < 0 or vlen+dlen > tlen:
-                self.log("modifiedInVersion: bad record for oid %16x "
+                self.log("modifiedInVersion: bad record for oid %s "
                          "at position %d in cache file %d",
-                         U64(oid), ap, p < 0)
+                         _fmt_oid(oid), ap, p < 0)
                 del self._index[oid]
                 return None
 
@@ -581,7 +604,7 @@
         if not s:
             p = ''
             s = ZERO
-        tlen = 31 + len(p)
+        tlen = 31 + len(oid) + len(p)
         if version:
             tlen = tlen + len(version) + 12 + len(pv)
             vlen = len(version)
@@ -590,7 +613,11 @@
 
         stlen = pack(">I", tlen)
         # accumulate various data to write into a list
-        l = [oid, 'v', stlen, pack(">HI", vlen, len(p)), s]
+        assert len(oid) < 2**16
+        assert vlen < 2**16
+        assert tlen < 2L**32
+        l = [pack(">H6x", len(oid)), 'v', stlen,
+             pack(">HI", vlen, len(p)), s, oid]
         if p:
             l.append(p)
         if version:
@@ -643,11 +670,11 @@
         if version:
             code |= 0x80
         self._tracefile.write(
-            struct_pack(">ii8s8s",
+            struct_pack(">iiH8s",
                         time_time(),
                         (dlen+255) & 0x7fffff00 | code | self._current,
-                        oid,
-                        serial))
+                        len(oid),
+                        serial) + oid)
 
     def read_index(self, serial, fileindex):
         index = self._index
@@ -658,9 +685,8 @@
         count = 0
 
         while 1:
-            f.seek(pos)
-            h = read(27)
-            if len(h) != 27:
+            h, oid = self._read_header(f, pos)
+            if h is None:
                 # An empty read is expected, anything else is suspect
                 if h:
                     self.rilog("truncated header", pos, fileindex)
@@ -674,8 +700,6 @@
                 self.rilog("invalid header data", pos, fileindex)
                 break
 
-            oid = h[:8]
-
             if h[8] == 'v' and vlen:
                 seek(dlen+vlen, 1)
                 vdlen = read(4)
@@ -683,7 +707,7 @@
                     self.rilog("truncated record", pos, fileindex)
                     break
                 vdlen = unpack(">i", vdlen)[0]
-                if vlen+dlen+43+vdlen != tlen:
+                if vlen + dlen + 43 + len(oid) + vdlen != tlen:
                     self.rilog("inconsistent lengths", pos, fileindex)
                     break
                 seek(vdlen, 1)
@@ -693,7 +717,7 @@
                     break
             else:
                 if h[8] in 'vn' and vlen == 0:
-                    if dlen+31 != tlen:
+                    if dlen + len(oid) + 31 != tlen:
                         self.rilog("inconsistent nv lengths", pos, fileindex)
                     seek(dlen, 1)
                     if read(4) != h[9:13]: