[Zope-Checkins] CVS: ZODB3/ZODB - FileStorage.py:1.139.2.1 DemoStorage.py:1.21.2.1 DB.py:1.55.2.1 Connection.py:1.100.2.1 BaseStorage.py:1.36.2.1

Jeremy Hylton jeremy at zope.com
Tue Oct 7 01:11:03 EDT 2003


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv28710/ZODB

Modified Files:
      Tag: ZODB3-mvcc-2-branch
	FileStorage.py DemoStorage.py DB.py Connection.py 
	BaseStorage.py 
Log Message:
Merge changes from the ZODB3-mvcc-branch.

This new branch is relative to the head, instead of the
Zope-2_7-branch.


=== ZODB3/ZODB/FileStorage.py 1.139 => 1.139.2.1 ===
--- ZODB3/ZODB/FileStorage.py:1.139	Thu Oct  2 18:14:04 2003
+++ ZODB3/ZODB/FileStorage.py	Tue Oct  7 01:10:31 2003
@@ -643,7 +643,7 @@
             spos = h[-8:]
             srcpos = u64(spos)
         self._toid2serial_delete.update(current_oids)
-        return oids
+        return self._serial, oids
 
     def getSize(self):
         return self._pos
@@ -1315,7 +1315,7 @@
         # It's too painful to try to update them to correct current
         # values instead.
         self._toid2serial_delete.update(tindex)
-        return tindex.keys()
+        return self._serial, tindex.keys()
 
     def _txn_find(self, tid, stop_at_pack):
         pos = self._pos
@@ -1460,9 +1460,6 @@
         self._lock_acquire()
         try:
             r=[]
-            file=self._file
-            seek=file.seek
-            read=file.read
             try:
                 pos=self._index[oid]
             except KeyError:
@@ -1473,14 +1470,14 @@
 
             while 1:
                 if len(r) >= size: return r
-                seek(pos)
-                h=read(DATA_HDR_LEN)
+                self._file.seek(pos)
+                h=self._file.read(DATA_HDR_LEN)
                 doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
                 prev=u64(prev)
 
                 if vlen:
-                    read(16)
-                    version=read(vlen)
+                    self._file.read(16)
+                    version = self._file.read(vlen)
                     if wantver is not None and version != wantver:
                         if prev:
                             pos=prev
@@ -1491,13 +1488,15 @@
                     version=''
                     wantver=None
 
-                seek(u64(tloc))
-                h=read(TRANS_HDR_LEN)
+                self._file.seek(u64(tloc))
+                h = self._file.read(TRANS_HDR_LEN)
                 tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
-                user_name=read(ul)
-                description=read(dl)
-                if el: d=loads(read(el))
-                else: d={}
+                user_name = self._file.read(ul)
+                description = self._file.read(dl)
+                if el:
+                    d=loads(self._file.read(el))
+                else:
+                    d={}
 
                 d['time']=TimeStamp(serial).timeTime()
                 d['user_name']=user_name


=== ZODB3/ZODB/DemoStorage.py 1.21 => 1.21.2.1 ===
--- ZODB3/ZODB/DemoStorage.py:1.21	Thu Oct  2 14:17:19 2003
+++ ZODB3/ZODB/DemoStorage.py	Tue Oct  7 01:10:31 2003
@@ -148,7 +148,7 @@
                     # effectively, delete the thing
                     self._tindex.append([oid, None, r, None, None])
 
-            return oids
+            return self._serial, oids
 
         finally: self._lock_release()
 
@@ -181,7 +181,7 @@
                 tindex.append([oid, newserial, r, new_vdata, p])
 
 
-            return oids
+            return self._serial, oids
 
         finally: self._lock_release()
 


=== ZODB3/ZODB/DB.py 1.55 => 1.55.2.1 ===
--- ZODB3/ZODB/DB.py:1.55	Thu Oct  2 14:17:19 2003
+++ ZODB3/ZODB/DB.py	Tue Oct  7 01:10:32 2003
@@ -74,7 +74,7 @@
         self._version_cache_size=version_cache_size
         self._version_cache_deactivate_after = version_cache_deactivate_after
 
-        self._miv_cache={}
+        self._miv_cache = {}
 
         # Setup storage
         self._storage=storage
@@ -303,8 +303,7 @@
     def importFile(self, file):
         raise 'Not yet implemented'
 
-    def invalidate(self, oids, connection=None, version='',
-                   rc=sys.getrefcount):
+    def invalidate(self, tid, oids, connection=None, version=''):
         """Invalidate references to a given oid.
 
         This is used to indicate that one of the connections has committed a
@@ -326,21 +325,21 @@
             for cc in allocated:
                 if (cc is not connection and
                     (not version or cc._version==version)):
-                    if rc(cc) <= 3:
+                    if sys.getrefcount(cc) <= 3:
                         cc.close()
-                    cc.invalidate(oids)
+                    cc.invalidate(tid, oids)
 
-        temps=self._temps
-        if temps:
+        if self._temps:
             t=[]
-            for cc in temps:
-                if rc(cc) > 3:
+            for cc in self._temps:
+                if sys.getrefcount(cc) > 3:
                     if (cc is not connection and
-                        (not version or cc._version==version)):
-                        cc.invalidate(oids)
+                        (not version or cc._version == version)):
+                        cc.invalidate(tid, oids)
                     t.append(cc)
-                else: cc.close()
-            self._temps=t
+                else:
+                    cc.close()
+            self._temps = t
 
     def modifiedInVersion(self, oid):
         h=hash(oid)%131
@@ -356,7 +355,7 @@
         return len(self._storage)
 
     def open(self, version='', transaction=None, temporary=0, force=None,
-             waitflag=1):
+             waitflag=1, mvcc=True):
         """Return a object space (AKA connection) to work in
 
         The optional version argument can be used to specify that a
@@ -374,25 +373,25 @@
         try:
 
             if transaction is not None:
-                connections=transaction._connections
+                connections = transaction._connections
                 if connections:
                     if connections.has_key(version) and not temporary:
                         return connections[version]
                 else:
-                    transaction._connections=connections={}
-                transaction=transaction._connections
-
+                    transaction._connections = connections = {}
+                transaction = transaction._connections
 
             if temporary:
                 # This is a temporary connection.
                 # We won't bother with the pools.  This will be
                 # a one-use connection.
-                c=self.klass(
-                    version=version,
-                    cache_size=self._version_cache_size)
+                c = self.klass(version=version,
+                               cache_size=self._version_cache_size,
+                               mvcc=mvcc)
                 c._setDB(self)
                 self._temps.append(c)
-                if transaction is not None: transaction[id(c)]=c
+                if transaction is not None:
+                    transaction[id(c)] = c
                 return c
 
 
@@ -433,18 +432,18 @@
 
 
             if not pool:
-                c=None
+                c = None
                 if version:
                     if self._version_pool_size > len(allocated) or force:
-                        c=self.klass(
-                            version=version,
-                            cache_size=self._version_cache_size)
+                        c = self.klass(version=version,
+                                       cache_size=self._version_cache_size,
+                                       mvcc=mvcc)
                         allocated.append(c)
                         pool.append(c)
                 elif self._pool_size > len(allocated) or force:
-                    c=self.klass(
-                        version=version,
-                        cache_size=self._cache_size)
+                    c = self.klass(version=version,
+                                   cache_size=self._cache_size,
+                                   mvcc=mvcc)
                     allocated.append(c)
                     pool.append(c)
 
@@ -459,7 +458,7 @@
                             pool_lock.release()
                     else: return
 
-            elif len(pool)==1:
+            elif len(pool) == 1:
                 # Taking last one, lock the pool
                 # Note that another thread might grab the lock
                 # before us, so we might actually block, however,
@@ -473,14 +472,15 @@
                     # but it could be higher due to a race condition.
                     pool_lock.release()
 
-            c=pool[-1]
+            c = pool[-1]
             del pool[-1]
             c._setDB(self)
             for pool, allocated in pooll:
                 for cc in pool:
                     cc._incrgc()
 
-            if transaction is not None: transaction[version]=c
+            if transaction is not None:
+                transaction[version] = c
             return c
 
         finally: self._r()
@@ -591,7 +591,8 @@
             d = {}
             for oid in storage.undo(id):
                 d[oid] = 1
-            self.invalidate(d)
+            # XXX I think we need to remove old undo to use mvcc
+            self.invalidate(None, d)
 
     def versionEmpty(self, version):
         return self._storage.versionEmpty(version)
@@ -619,13 +620,13 @@
 
     def commit(self, reallyme, t):
         dest=self._dest
-        oids = self._db._storage.commitVersion(self._version, dest, t)
+        tid, oids = self._db._storage.commitVersion(self._version, dest, t)
         oids = list2dict(oids)
-        self._db.invalidate(oids, version=dest)
+        self._db.invalidate(tid, oids, version=dest)
         if dest:
             # the code above just invalidated the dest version.
             # now we need to invalidate the source!
-            self._db.invalidate(oids, version=self._version)
+            self._db.invalidate(tid, oids, version=self._version)
 
 class AbortVersion(CommitVersion):
     """An object that will see to version abortion
@@ -634,9 +635,9 @@
     """
 
     def commit(self, reallyme, t):
-        version=self._version
-        oids = self._db._storage.abortVersion(version, t)
-        self._db.invalidate(list2dict(oids), version=version)
+        version = self._version
+        tid, oids = self._db._storage.abortVersion(version, t)
+        self._db.invalidate(tid, list2dict(oids), version=version)
 
 
 class TransactionalUndo(CommitVersion):
@@ -650,5 +651,5 @@
     # similarity of rhythm that I think it's justified.
 
     def commit(self, reallyme, t):
-        oids = self._db._storage.transactionalUndo(self._version, t)
-        self._db.invalidate(list2dict(oids))
+        tid, oids = self._db._storage.transactionalUndo(self._version, t)
+        self._db.invalidate(tid, list2dict(oids))


=== ZODB3/ZODB/Connection.py 1.100 => 1.100.2.1 ===
--- ZODB3/ZODB/Connection.py:1.100	Thu Oct  2 14:17:19 2003
+++ ZODB3/ZODB/Connection.py	Tue Oct  7 01:10:32 2003
@@ -21,7 +21,6 @@
 from POSException import ConflictError, ReadConflictError, TransactionError
 from ExtensionClass import Base
 import ExportImport, TmpStore
-from zLOG import LOG, ERROR, BLATHER, WARNING
 from coptimizations import new_persistent_id
 from ConflictResolution import ResolvedSerial
 from Transaction import Transaction, get_transaction
@@ -29,10 +28,17 @@
 
 from cPickle import Unpickler, Pickler
 from cStringIO import StringIO
+import logging
 import sys
 import threading
 from time import time
-from types import StringType, ClassType
+from types import ClassType
+
+_marker = object()
+
+def myhasattr(obj, attr):
+    # builtin hasattr() swallows exceptions
+    return getattr(obj, attr, _marker) is not _marker
 
 global_code_timestamp = 0
 
@@ -55,9 +61,9 @@
 
     The Connection manages movement of objects in and out of object storage.
     """
-    _tmp=None
-    _debug_info=()
-    _opened=None
+    _tmp = None
+    _debug_info = ()
+    _opened = None
     _code_timestamp = 0
     _transaction = None
 
@@ -65,9 +71,12 @@
     # when we close by putting something here.
 
     def __init__(self, version='', cache_size=400,
-                 cache_deactivate_after=60):
+                 cache_deactivate_after=60, mvcc=True):
         """Create a new Connection"""
-        self._version=version
+
+        self._log = logging.getLogger("zodb.conn")
+        
+        self._version = version
         self._cache = cache = PickleCache(self, cache_size)
         if version:
             # Caches for versions end up empty if the version
@@ -99,6 +108,14 @@
         self._invalidated = d = {}
         self._invalid = d.has_key
         self._conflicts = {}
+        self._noncurrent = {}
+
+        # If MVCC is enabled, then _mvcc is True and _txn_time stores
+        # the upper bound on transactions visible to this connection.
+        # That is, all object revisions must be written before _txn_time.
+        # If it is None, then the current revisions are acceptable.
+        self._mvcc = mvcc
+        self._txn_time = None
 
     def getTransaction(self):
         t = self._transaction
@@ -141,7 +158,7 @@
         self._incrgc = None
         self.cacheGC = None
 
-    def __getitem__(self, oid, tt=type(())):
+    def __getitem__(self, oid):
         obj = self._cache.get(oid, None)
         if obj is not None:
             return obj
@@ -157,9 +174,9 @@
 
         klass, args = object
 
-        if type(klass) is tt:
+        if isinstance(klass, tuple):
             module, name = klass
-            klass=self._db._classFactory(self, module, name)
+            klass = self._db._classFactory(self, module, name)
 
         if (args is None or
             not args and not hasattr(klass,'__getinitargs__')):
@@ -177,12 +194,10 @@
         self._cache[oid] = object
         return object
 
-    def _persistent_load(self,oid,
-                        tt=type(())):
-
+    def _persistent_load(self, oid):
         __traceback_info__=oid
 
-        if type(oid) is tt:
+        if isinstance(oid, tuple):
             # Quick instance reference.  We know all we need to know
             # to create the instance wo hitting the db, so go for it!
             oid, klass = oid
@@ -190,9 +205,10 @@
             if obj is not None:
                 return obj
 
-            if type(klass) is tt:
+            if isinstance(klass, tuple):
                 module, name = klass
-                try: klass=self._db._classFactory(self, module, name)
+                try:
+                    klass=self._db._classFactory(self, module, name)
                 except:
                     # Eek, we couldn't get the class. Hm.
                     # Maybe their's more current data in the
@@ -282,11 +298,12 @@
         # Call the close callbacks.
         if self.__onCloseCallbacks is not None:
             for f in self.__onCloseCallbacks:
-                try: f()
-                except:
-                    f=getattr(f, 'im_self', f)
-                    LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
-                        error=sys.exc_info())
+                try:
+                    f()
+                except: # except what?
+                    f = getattr(f, 'im_self', f)
+                    self._log.error("Close callback failed for %s", f,
+                                    sys.exc_info())
             self.__onCloseCallbacks = None
         self._storage = self._tmp = self.new_oid = self._opened = None
         self._debug_info = ()
@@ -438,8 +455,8 @@
         if tmp is None: return
         src=self._storage
 
-        LOG('ZODB', BLATHER,
-            'Commiting subtransaction of size %s' % src.getSize())
+        self._log.debug("Commiting subtransaction of size %s",
+                        src.getSize())
 
         self._storage=tmp
         self._tmp=None
@@ -490,8 +507,6 @@
                 del o._p_jar
                 del o._p_oid
 
-    #XXX
-
     def db(self): return self._db
 
     def getVersion(self): return self._version
@@ -499,7 +514,7 @@
     def isReadOnly(self):
         return self._storage.isReadOnly()
 
-    def invalidate(self, oids):
+    def invalidate(self, tid, oids):
         """Invalidate a set of oids.
 
         This marks the oid as invalid, but doesn't actually invalidate
@@ -508,6 +523,8 @@
         """
         self._inv_lock.acquire()
         try:
+            if self._txn_time is None:
+                self._txn_time = tid
             self._invalidated.update(oids)
         finally:
             self._inv_lock.release()
@@ -517,13 +534,15 @@
         try:
             self._cache.invalidate(self._invalidated)
             self._invalidated.clear()
+            self._txn_time = None
         finally:
             self._inv_lock.release()
         # Now is a good time to collect some garbage
         self._cache.incrgc()
 
     def modifiedInVersion(self, oid):
-        try: return self._db.modifiedInVersion(oid)
+        try:
+            return self._db.modifiedInVersion(oid)
         except KeyError:
             return self._version
 
@@ -547,55 +566,85 @@
         if self._storage is None:
             msg = ("Shouldn't load state for %s "
                    "when the connection is closed" % oid_repr(oid))
-            LOG('ZODB', ERROR, msg)
+            self._log.error(msg)
             raise RuntimeError(msg)
 
         try:
-            # Avoid reading data from a transaction that committed
-            # after the current transaction started, as that might
-            # lead to mixing of cached data from earlier transactions
-            # and new inconsistent data.
-            #
-            # Wait for check until after data is loaded from storage
-            # to avoid time-of-check to time-of-use race.
-            p, serial = self._storage.load(oid, self._version)
-            self._load_count = self._load_count + 1
-            invalid = self._is_invalidated(obj)
-            self._set_ghost_state(obj, p)
-            obj._p_serial = serial
-            if invalid:
-                self._handle_independent(obj)
+            self._setstate(obj)
         except ConflictError:
             raise
         except:
-            LOG('ZODB', ERROR,
-                "Couldn't load state for %s" % oid_repr(oid),
-                error=sys.exc_info())
+            self._log.error("Couldn't load state for %s", oid_repr(oid),
+                            exc_info=sys.exc_info())
             raise
 
-    def _is_invalidated(self, obj):
-        # Helper method for setstate() covers three cases:
-        # returns false if obj is valid
-        # returns true if obj was invalidation, but is independent
-        # otherwise, raises ConflictError for invalidated objects
+    def _setstate(self, obj):
+        # Helper for setstate(), which provides logging of failures.
+
+        # The control flow is complicated here to avoid loading an
+        # object revision that we are sure we aren't going to use.  As
+        # a result, invalidation tests occur before and after the
+        # load.  We can only be sure about invalidations after the
+        # load.
+
+        # If an object has been invalidated, there are several cases
+        # to consider:
+        # 1. Check _p_independent()
+        # 2. Try MVCC
+        # 3. Raise ConflictError.
+
+        # Does anything actually use _p_independent()?  It would simplify
+        # the code if we could drop support for it.
+
+        # There is a harmless data race with self._invalidated.  A
+        # dict update could go on in another thread, but we don't care
+        # because we have to check again after the load anyway.
+        if (obj._p_oid in self._invalidated
+            and not myhasattr(obj, "_p_independent")):
+            # If the object has _p_independent(), we will handle it below.
+            if not (self._mvcc and self._setstate_noncurrent(obj)):
+                self.getTransaction().register(obj)
+                self._conflicts[obj._p_oid] = 1
+                raise ReadConflictError(object=obj)
+
+        p, serial = self._storage.load(obj._p_oid, self._version)
+        self._load_count += 1
+
         self._inv_lock.acquire()
         try:
-            if self._invalidated.has_key(obj._p_oid):
-                # Defer _p_independent() call until state is loaded.
-                ind = getattr(obj, "_p_independent", None)
-                if ind is not None:
-                    # Defer _p_independent() call until state is loaded.
-                    return 1
-                else:
-                    self.getTransaction().register(obj)
-                    self._conflicts[obj._p_oid] = 1
-                    raise ReadConflictError(object=obj)
-            else:
-                return 0
+            invalid = obj._p_oid in self._invalidated
         finally:
             self._inv_lock.release()
+            
+        if invalid:
+            if myhasattr(obj, "_p_independent"):
+                # This call will raise a ReadConflictError if something
+                # goes wrong
+                self._handle_independent(obj)
+            elif not (self._mvcc and self._setstate_noncurrent(obj)):
+                self.getTransaction().register(obj)
+                self._conflicts[obj._p_oid] = 1
+                raise ReadConflictError(object=obj)
+                
+        self._set_ghost_state(obj, p, serial)
+
+    def _setstate_noncurrent(self, obj):
+        """Set state using non-current data.
+
+        Return True if state was available, False if not.
+        """
+        try:
+            t = self._storage.loadNonCurrent(obj._p_oid, self._txn_time)
+        except KeyError:
+            return False
+        if t is None:
+            return False
+        data, serial, start, end = t
+        assert start < end == self._txn_time, (start, end, self._txn_time)
+        self._noncurrent[obj._p_oid] = True
+        self._set_ghost_state(obj, data, serial)
 
-    def _set_ghost_state(self, obj, p):
+    def _set_ghost_state(self, obj, p, serial):
         file = StringIO(p)
         unpickler = Unpickler(file)
         unpickler.persistent_load = self._persistent_load
@@ -607,6 +656,7 @@
             obj.update(state)
         else:
             setstate(state)
+        obj._p_serial = serial
 
     def _handle_independent(self, obj):
         # Helper method for setstate() handles possibly independent objects
@@ -649,9 +699,9 @@
             klass, args = copy
 
             if klass is not ExtensionKlass:
-                LOG('ZODB',ERROR,
-                    "Unexpected klass when setting class state on %s"
-                    % getattr(object,'__name__','(?)'))
+                self._log.error(
+                    "Unexpected klass when setting class state on %s",
+                    getattr(object, "__name__", "(?)"))
                 return
 
             copy = klass(*args)
@@ -663,7 +713,7 @@
             object._p_changed=0
             object._p_serial=serial
         except:
-            LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
+            self._log.error("setklassstate failed", exc_info=sys.exc_info())
             raise
 
     def tpc_abort(self, transaction):
@@ -720,7 +770,7 @@
 
         if not store_return:
             return
-        if isinstance(store_return, StringType):
+        if isinstance(store_return, str):
             assert oid is not None
             self._handle_one_serial(oid, store_return, change)
         else:
@@ -728,7 +778,7 @@
                 self._handle_one_serial(oid, serial, change)
 
     def _handle_one_serial(self, oid, serial, change):
-        if not isinstance(serial, StringType):
+        if not isinstance(serial, str):
             raise serial
         obj = self._cache.get(oid, None)
         if obj is None:
@@ -754,11 +804,11 @@
             self._storage._creating[:0]=self._creating
             del self._creating[:]
         else:
-            def callback():
+            def callback(tid):
                 d = {}
                 for oid in self._modified:
                     d[oid] = 1
-                self._db.invalidate(d, self)
+                self._db.invalidate(tid, d, self)
             self._storage.tpc_finish(transaction, callback)
 
         self._conflicts.clear()


=== ZODB3/ZODB/BaseStorage.py 1.36 => 1.36.2.1 ===
--- ZODB3/ZODB/BaseStorage.py:1.36	Thu Oct  2 14:17:19 2003
+++ ZODB3/ZODB/BaseStorage.py	Tue Oct  7 01:10:32 2003
@@ -56,12 +56,12 @@
     def abortVersion(self, src, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        return []
+        return self._serial, []
 
     def commitVersion(self, src, dest, transaction):
         if transaction is not self._transaction:
             raise POSException.StorageTransactionError(self, transaction)
-        return []
+        return self._serial, []
 
     def close(self):
         pass
@@ -199,7 +199,7 @@
                 return
             try:
                 if f is not None:
-                    f()
+                    f(self._serial)
                 u, d, e = self._ude
                 self._finish(self._serial, u, d, e)
                 self._clear_temp()
@@ -245,6 +245,33 @@
     def loadSerial(self, oid, serial):
         raise POSException.Unsupported, (
             "Retrieval of historical revisions is not supported")
+
+    def loadNonCurrent(self, oid, tid):
+        """Return most recent revision of oid before tid committed."""
+
+        n = 2
+        start_time = None
+        while start_time is None:
+            # The history() approach is a hack, because the dict
+            # returned by history() doesn't contain a tid.  It
+            # contains a serialno, which is often the same, but isn't
+            # required to be.  We'll pretend it is for now.
+
+            # A second problem is that history() doesn't say anything
+            # about whether the transaction status.  If it falls before
+            # the pack time, we can't honor the MVCC request.
+
+            # Note: history() returns the most recent record first.
+            L = self.history(oid, "", lambda d: not d["version"])
+            for d in L:
+                if d["serial"] < tid:
+                    start_time = d["serial"]
+                    break
+                else:
+                    end_time = d["serial"]
+            n *= 2
+        data = self.loadSerial(oid, start_time)
+        return data, start_time, start_time, end_time
 
     def getExtensionMethods(self):
         """getExtensionMethods




More information about the Zope-Checkins mailing list