[Zodb-checkins] CVS: ZODB3/ZODB - fspack.py:1.8.8.3 config.py:1.13.4.1 FileStorage.py:1.135.6.2 DB.py:1.53.2.1 Connection.py:1.98.4.1 BaseStorage.py:1.34.4.1

Jeremy Hylton jeremy at zope.com
Mon Sep 15 14:03:44 EDT 2003


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv13599/ZODB

Modified Files:
      Tag: Zope-2_7-branch
	fspack.py config.py FileStorage.py DB.py Connection.py 
	BaseStorage.py 
Log Message:
Take two: Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.

Please make all future changes on the Zope-2_7-branch instead.

The previous attempt used "cvs up -j ZODB3-3_2-branch", but appeared
to get only a small fraction of the changes.  This attempt is based on
copying a checkout of ZODB3-3_2-branch over top of a checkout of
Zope-2_7-branch.


=== ZODB3/ZODB/fspack.py 1.8.8.2 => 1.8.8.3 ===
--- ZODB3/ZODB/fspack.py:1.8.8.2	Wed Sep  3 16:03:41 2003
+++ ZODB3/ZODB/fspack.py	Mon Sep 15 14:02:57 2003
@@ -33,7 +33,7 @@
 from types import StringType
 
 from ZODB.referencesf import referencesf
-from ZODB.utils import p64, u64, z64
+from ZODB.utils import p64, u64, z64, oid_repr
 from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
 
 try:
@@ -54,7 +54,7 @@
 
     def __str__(self):
         if self.oid:
-            msg = "Error reading oid %s.  Found %r" % (_fmt_oid(self.oid),
+            msg = "Error reading oid %s.  Found %r" % (oid_repr(self.oid),
                                                        self.buf)
         else:
             msg = "Error reading unknown oid.  Found %r" % self.buf
@@ -166,7 +166,7 @@
     def checkTxn(self, th, pos):
         if th.tid <= self.ltid:
             self.fail(pos, "time-stamp reduction: %s <= %s",
-                      _fmt_oid(th.tid), _fmt_oid(self.ltid))
+                      oid_repr(th.tid), oid_repr(self.ltid))
         self.ltid = th.tid
         if th.status == "c":
             self.fail(pos, "transaction with checkpoint flag set")
@@ -647,11 +647,15 @@
         # vindex: version -> pos of XXX
         # tindex: oid -> pos, for current txn
         # tvindex: version -> pos of XXX, for current txn
+        # oid2serial: not used by the packer
         
         self.index = fsIndex()
         self.vindex = {}
         self.tindex = {}
         self.tvindex = {}
+        self.oid2serial = {}
+        self.toid2serial = {}
+        self.toid2serial_delete = {}
 
         # Index for non-version data.  This is a temporary structure
         # to reduce I/O during packing


=== ZODB3/ZODB/config.py 1.13 => 1.13.4.1 ===
--- ZODB3/ZODB/config.py:1.13	Mon Jun 16 10:51:49 2003
+++ ZODB3/ZODB/config.py	Mon Sep 15 14:02:58 2003
@@ -157,7 +157,7 @@
             if name.startswith('_'):
                 continue
             setattr(bconf, name, getattr(self.config, name))
-        return storageclass(self.config.name, config=bconf)
+        return storageclass(self.config.envdir, config=bconf)
 
 class BDBMinimalStorage(BDBStorage):
 


=== ZODB3/ZODB/FileStorage.py 1.135.6.1 => 1.135.6.2 ===
--- ZODB3/ZODB/FileStorage.py:1.135.6.1	Mon Jul 21 12:37:18 2003
+++ ZODB3/ZODB/FileStorage.py	Mon Sep 15 14:02:58 2003
@@ -157,17 +157,21 @@
 assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
 assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
 
+def blather(message, *data):
+    LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
+                                                  message % data))
+
 def warn(message, *data):
     LOG('ZODB FS', WARNING, "%s  warn: %s\n" % (packed_version,
-                                                (message % data)))
+                                                message % data))
 
 def error(message, *data):
     LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
-                                              (message % data)))
+                                              message % data))
 
 def nearPanic(message, *data):
     LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
-                                              (message % data)))
+                                              message % data))
 
 def panic(message, *data):
     message = message % data
@@ -234,8 +238,10 @@
 
         BaseStorage.BaseStorage.__init__(self, file_name)
 
-        index, vindex, tindex, tvindex = self._newIndexes()
-        self._initIndex(index, vindex, tindex, tvindex)
+        (index, vindex, tindex, tvindex,
+         oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
+        self._initIndex(index, vindex, tindex, tvindex,
+                        oid2serial, toid2serial, toid2serial_delete)
 
         # Now open the file
 
@@ -269,7 +275,8 @@
             self._used_index = 1 # Marker for testing
             index, vindex, start, maxoid, ltid = r
 
-            self._initIndex(index, vindex, tindex, tvindex)
+            self._initIndex(index, vindex, tindex, tvindex,
+                            oid2serial, toid2serial, toid2serial_delete)
             self._pos, self._oid, tid = read_index(
                 self._file, file_name, index, vindex, tindex, stop,
                 ltid=ltid, start=start, maxoid=maxoid,
@@ -302,7 +309,11 @@
 
         self._quota = quota
 
-    def _initIndex(self, index, vindex, tindex, tvindex):
+        # Serialno cache statistics.
+        self._oid2serial_nlookups = self._oid2serial_nhits = 0
+
+    def _initIndex(self, index, vindex, tindex, tvindex,
+                   oid2serial, toid2serial, toid2serial_delete):
         self._index=index
         self._vindex=vindex
         self._tindex=tindex
@@ -310,12 +321,33 @@
         self._index_get=index.get
         self._vindex_get=vindex.get
 
+        # .store() needs to compare the passed-in serial to the current
+        # serial in the database.  _oid2serial caches the oid -> current
+        # serial mapping for non-version data (if the current record for
+        # oid is version data, the oid is not a key in _oid2serial).
+        # The point is that otherwise seeking into the storage is needed
+        # to extract the current serial, and that's an expensive operation.
+        # For example, if a transaction stores 4000 objects, and each
+        # random seek + read takes 7ms (that was approximately true on
+        # Linux and Windows tests in mid-2003), that's 28 seconds just to
+        # find the old serials.
+        # XXX Probably better to junk this and redefine _index as mapping
+        # XXX oid to (offset, serialno) pair, via a new memory-efficient
+        # XXX BTree type.
+        self._oid2serial = oid2serial
+        # oid->serialno map to transactionally add to _oid2serial.
+        self._toid2serial = toid2serial
+        # Set of oids to transactionally delete from _oid2serial (e.g.,
+        # oids reverted by undo, or for which the most recent record
+        # becomes version data).
+        self._toid2serial_delete = toid2serial_delete
+
     def __len__(self):
         return len(self._index)
 
     def _newIndexes(self):
         # hook to use something other than builtin dict
-        return fsIndex(), {}, {}, {}
+        return fsIndex(), {}, {}, {}, {}, {}, {}
 
     _saved = 0
     def _save_index(self):
@@ -483,6 +515,31 @@
             # XXX should log the error, though
             pass # We don't care if this fails.
 
+    # Return serial number of most recent record for oid if that's in
+    # the _oid2serial cache.  Else return None.  It's important to use
+    # this instead of indexing _oid2serial directly so that cache
+    # statistics can be logged.
+    def _get_cached_serial(self, oid):
+        self._oid2serial_nlookups += 1
+        result = self._oid2serial.get(oid)
+        if result is not None:
+            self._oid2serial_nhits += 1
+
+        # Log a msg every ~8000 tries, and prevent overflow.
+        if self._oid2serial_nlookups & 0x1fff == 0:
+            if self._oid2serial_nlookups >> 30:
+                # In older Pythons, we may overflow if we keep it an int.
+                self._oid2serial_nlookups = long(self._oid2serial_nlookups)
+                self._oid2serial_nhits = long(self._oid2serial_nhits)
+            blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
+                    len(self._oid2serial),
+                    self._oid2serial_nlookups,
+                    self._oid2serial_nhits,
+                    100.0 * self._oid2serial_nhits /
+                            self._oid2serial_nlookups)
+
+        return result
+
     def abortVersion(self, src, transaction):
         return self.commitVersion(src, '', transaction, abort=1)
 
@@ -585,33 +642,11 @@
 
             spos = h[-8:]
             srcpos = u64(spos)
+        self._toid2serial_delete.update(current_oids)
         return oids
 
-    def getSize(self): return self._pos
-
-    def _loada(self, oid, _index, file):
-        "Read any version and return the version"
-        try:
-            pos=_index[oid]
-        except KeyError:
-            raise POSKeyError(oid)
-        except TypeError:
-            raise TypeError, 'invalid oid %r' % (oid,)
-        file.seek(pos)
-        read=file.read
-        h=read(DATA_HDR_LEN)
-        doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
-        if vlen:
-            nv = u64(read(8))
-            read(8) # Skip previous version record pointer
-            version = read(vlen)
-        else:
-            version = ''
-            nv = 0
-
-        if plen != z64:
-            return read(u64(plen)), version, nv
-        return _loadBack(file, oid, read(8))[0], version, nv
+    def getSize(self):
+        return self._pos
 
     def _load(self, oid, version, _index, file):
         try:
@@ -632,6 +667,10 @@
                 (read(8) # skip past version link
                  and version != read(vlen))):
                 return _loadBack(file, oid, pnv)
+        else:
+            # The most recent record is for non-version data -- cache
+            # the serialno.
+            self._oid2serial[oid] = serial
 
         # If we get here, then either this was not a version record,
         # or we've already read past the version data!
@@ -713,20 +752,25 @@
 
         self._lock_acquire()
         try:
-            old=self._index_get(oid, 0)
-            pnv=None
+            old = self._index_get(oid, 0)
+            cached_serial = None
+            pnv = None
             if old:
-                self._file.seek(old)
-                h=self._file.read(DATA_HDR_LEN)
-                doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
-                if doid != oid: raise CorruptedDataError(h)
-                if vlen:
-                    pnv=self._file.read(8) # non-version data pointer
-                    self._file.read(8) # skip past version link
-                    locked_version=self._file.read(vlen)
-                    if version != locked_version:
-                        raise POSException.VersionLockError, (
-                            `oid`, locked_version)
+                cached_serial = self._get_cached_serial(oid)
+                if cached_serial is None:
+                    self._file.seek(old)
+                    h=self._file.read(DATA_HDR_LEN)
+                    doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
+                    if doid != oid: raise CorruptedDataError(h)
+                    if vlen:
+                        pnv=self._file.read(8) # non-version data pointer
+                        self._file.read(8) # skip past version link
+                        locked_version=self._file.read(vlen)
+                        if version != locked_version:
+                            raise POSException.VersionLockError, (
+                                `oid`, locked_version)
+                else:
+                    oserial = cached_serial
 
                 if serial != oserial:
                     data = self.tryToResolveConflict(oid, oserial, serial,
@@ -749,14 +793,19 @@
                        )
                   )
             if version:
-                if pnv: write(pnv)
-                else:   write(p64(old))
+                if pnv:
+                    write(pnv)
+                else:
+                    write(p64(old))
                 # Link to last record for this version:
                 tvindex=self._tvindex
                 pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
                 write(p64(pv))
                 tvindex[version]=here
                 write(version)
+                self._toid2serial_delete[oid] = 1
+            else:
+                self._toid2serial[oid] = newserial
 
             write(data)
 
@@ -875,7 +924,11 @@
                 self._tfile.write(p64(pv))
                 self._tvindex[version] = here
                 self._tfile.write(version)
-            # And finally, write the data or a backpointer
+                self._toid2serial_delete[oid] = 1
+            else:
+                self._toid2serial[oid] = serial
+
+            # Finally, write the data or a backpointer.
             if data is None:
                 if prev_pos:
                     self._tfile.write(p64(prev_pos))
@@ -940,6 +993,8 @@
     def _clear_temp(self):
         self._tindex.clear()
         self._tvindex.clear()
+        self._toid2serial.clear()
+        self._toid2serial_delete.clear()
         if self._tfile is not None:
             self._tfile.seek(0)
 
@@ -1023,6 +1078,12 @@
             
             self._index.update(self._tindex)
             self._vindex.update(self._tvindex)
+            self._oid2serial.update(self._toid2serial)
+            for oid in self._toid2serial_delete.keys():
+                try:
+                    del self._oid2serial[oid]
+                except KeyError:
+                    pass
             
             # Update the number of records that we've written
             # +1 for the transaction record
@@ -1090,21 +1151,28 @@
     def getSerial(self, oid):
         self._lock_acquire()
         try:
-            try:
-                return self._getSerial(oid, self._index[oid])
-            except KeyError:
-                raise POSKeyError(oid)
-            except TypeError:
-                raise TypeError, 'invalid oid %r' % (oid,)
+            result = self._get_cached_serial(oid)
+            if result is None:
+                try:
+                    result = self._getSerial(oid, self._index[oid])
+                except KeyError:
+                    raise POSKeyError(oid)
+                except TypeError:
+                    raise TypeError, 'invalid oid %r' % (oid,)
+            return result
         finally:
             self._lock_release()
 
     def _getSerial(self, oid, pos):
         self._file.seek(pos)
-        h = self._file.read(DATA_HDR_LEN)
+        h = self._file.read(16)
+        if len(h) < 16:
+            raise CorruptedDataError(h)
+        h += self._file.read(26) # get rest of header
+        if h[:8] != oid:
+            raise CorruptedDataError(h)
         oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
-        assert oid == oid2
-        if splen==z64:
+        if splen == z64:
             # a back pointer
             bp = self._file.read(8)
             if bp == z64:
@@ -1243,6 +1311,10 @@
         tpos = self._txn_find(tid, 1)
         tindex = self._txn_undo_write(tpos)
         self._tindex.update(tindex)
+        # Arrange to clear the affected oids from the oid2serial cache.
+        # It's too painful to try to update them to correct current
+        # values instead.
+        self._toid2serial_delete.update(tindex)
         return tindex.keys()
 
     def _txn_find(self, tid, stop_at_pack):
@@ -1500,7 +1572,9 @@
                 # OK, we're beyond the point of no return
                 os.rename(self._file_name + '.pack', self._file_name)
                 self._file = open(self._file_name, 'r+b')
-                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
+                                p.oid2serial, p.toid2serial,
+                                p.toid2serial_delete)
                 self._pos = opos
                 self._save_index()
             finally:
@@ -1526,20 +1600,9 @@
         if it is a new object -- return None.
         """
         try:
-            pos = self._index[oid]
+            return self.getSerial(oid)
         except KeyError:
             return None
-        except TypeError:
-            raise TypeError, 'invalid oid %r' % (oid,)
-        self._file.seek(pos)
-        # first 8 bytes are oid, second 8 bytes are serialno
-        h = self._file.read(16)
-        if len(h) < 16:
-            raise CorruptedDataError(h)
-        if h[:8] != oid:
-            h = h + self._file.read(26) # get rest of header
-            raise CorruptedDataError(h)
-        return h[8:]
 
     def cleanup(self):
         """Remove all files created by this storage."""


=== ZODB3/ZODB/DB.py 1.53 => 1.53.2.1 ===
--- ZODB3/ZODB/DB.py:1.53	Tue Jun 24 17:50:18 2003
+++ ZODB3/ZODB/DB.py	Mon Sep 15 14:02:58 2003
@@ -32,7 +32,7 @@
         d[elt] = 1
     return d
 
-class DB(UndoLogCompatible.UndoLogCompatible):
+class DB(UndoLogCompatible.UndoLogCompatible, object):
     """The Object Database
 
     The Object database coordinates access to and interaction of one


=== ZODB3/ZODB/Connection.py 1.98 => 1.98.4.1 ===
--- ZODB3/ZODB/Connection.py:1.98	Fri Jun 13 17:53:08 2003
+++ ZODB3/ZODB/Connection.py	Mon Sep 15 14:02:58 2003
@@ -47,7 +47,7 @@
 
 ExtensionKlass = Base.__class__
 
-class Connection(ExportImport.ExportImport):
+class Connection(ExportImport.ExportImport, object):
     """Object managers for individual object space.
 
     An object space is a version of collection of objects.  In a
@@ -136,11 +136,10 @@
         # Explicitly remove references from the connection to its
         # cache and to the root object, because they refer back to the
         # connection.
-        self._cache.clear()
-        self._cache = None
+        if self._cache is not None:
+            self._cache.clear()
         self._incrgc = None
         self.cacheGC = None
-        self._root_ = None
 
     def __getitem__(self, oid, tt=type(())):
         obj = self._cache.get(oid, None)
@@ -176,8 +175,6 @@
         object._p_serial=serial
 
         self._cache[oid] = object
-        if oid=='\0\0\0\0\0\0\0\0':
-            self._root_=object # keep a ref
         return object
 
     def _persistent_load(self,oid,
@@ -279,7 +276,8 @@
         self.__onCloseCallbacks.append(f)
 
     def close(self):
-        self._incrgc() # This is a good time to do some GC
+        if self._incrgc is not None:
+            self._incrgc() # This is a good time to do some GC
 
         # Call the close callbacks.
         if self.__onCloseCallbacks is not None:


=== ZODB3/ZODB/BaseStorage.py 1.34 => 1.34.4.1 ===
--- ZODB3/ZODB/BaseStorage.py:1.34	Tue Jun 10 11:46:31 2003
+++ ZODB3/ZODB/BaseStorage.py	Mon Sep 15 14:02:58 2003
@@ -16,21 +16,26 @@
 $Id$
 """
 import cPickle
-import ThreadLock, bpthread
-import time, UndoLogCompatible
-import POSException
-from TimeStamp import TimeStamp
-z64='\0'*8
+import time
 
-class BaseStorage(UndoLogCompatible.UndoLogCompatible):
+import ThreadLock
+import zLOG
+from ZODB import bpthread
+from ZODB import POSException
+from ZODB.TimeStamp import TimeStamp
+from ZODB.UndoLogCompatible import UndoLogCompatible
+from ZODB.utils import z64
+
+class BaseStorage(UndoLogCompatible):
     _transaction=None # Transaction that is being committed
     _serial=z64       # Transaction serial number
     _tstatus=' '      # Transaction status, used for copying data
     _is_read_only = 0
 
     def __init__(self, name, base=None):
-
-        self.__name__=name
+        self.__name__= name
+        zLOG.LOG(self.__class__.__name__, zLOG.DEBUG,
+                 "create storage %s" % self.__name__)
 
         # Allocate locks:
         l=ThreadLock.allocate_lock()




More information about the Zodb-checkins mailing list