[Zodb-checkins] CVS: StandaloneZODB/ZEO - StorageServer.py:1.28.2.7

Jeremy Hylton jeremy@zope.com
Thu, 7 Mar 2002 21:16:55 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv9224

Modified Files:
      Tag: zeo-1_0-branch
	StorageServer.py 
Log Message:
Refactor and fix distributed commit lock implementation.

This is a candidate for ZEO 1.0b6.

This fixes a bug reported by Chris McDonough on zodb-dev.  The bug
caused the storage server to stop committing transactions for a
storage.  The details of the bug are the in the checkCommitLock test
cases. 

The following comment explains the new code.

    # distributed commit lock support methods

    # Only one client at a time can commit a transaction on a
    # storage.  If one client is committing a transaction, and a
    # second client sends a tpc_begin(), then second client is queued.
    # When the first transaction finishes, either by abort or commit,
    # the request from the queued client must be handled.

    # It is important that this code be robust.  If a queued
    # transaction is not restarted, the server will stop processing
    # new transactions.

    # This lock is implemented by storing the queued requests in a
    # list on the storage object.  The list contains:
    #     a callable object to resume request
    #     arguments to that object
    #     a callable object to handle errors during resume

    # XXX I am not sure that the commitlock_resume() method is
    # sufficiently paranoid.


=== StandaloneZODB/ZEO/StorageServer.py 1.28.2.6 => 1.28.2.7 ===
 from ZEO import trigger
 from ZEO import asyncwrap
+from ZEO.smac import Disconnected
 from types import StringType
 
 class StorageServerError(POSException.StorageError): pass
@@ -133,6 +134,8 @@
         self.__storages=storages
         for n, s in storages.items():
             init_storage(s)
+            # Create a waiting list to support the distributed commit lock.
+            s._waiting = []
 
         self.__connections={}
         self.__get_connections=self.__connections.get
@@ -280,6 +283,7 @@
             # This is the first communication from the client
             self.__storage, self.__storage_id = (
                 self.__server.register_connection(self, message))
+
             # Send info back asynchronously, so client need not ask
             self.message_output('S'+dump(self.get_info(), 1))
             return
@@ -501,39 +505,76 @@
             return oids
         return ()
 
-    def tpc_abort(self, id):
-        t=self._transaction
-        if t is None or id != t.id: return
-        r=self.__storage.tpc_abort(t)
+    # distributed commit lock support methods
 
-        storage=self.__storage
-        try: waiting=storage.__waiting
-        except: waiting=storage.__waiting=[]
+    # Only one client at a time can commit a transaction on a
+    # storage.  If one client is committing a transaction, and a
+    # second client sends a tpc_begin(), then second client is queued.
+    # When the first transaction finishes, either by abort or commit,
+    # the request from the queued client must be handled.
+
+    # It is important that this code be robust.  If a queued
+    # transaction is not restarted, the server will stop processing
+    # new transactions.
+
+    # This lock is implemented by storing the queued requests in a
+    # list on the storage object.  The list contains:
+    #     a callable object to resume request
+    #     arguments to that object
+    #     a callable object to handle errors during resume
+
+    # XXX I am not sure that the commitlock_resume() method is
+    # sufficiently paranoid.
+
+    def commitlock_suspend(self, resume, args, onerror):
+        self.__storage._waiting.append((resume, args, onerror))
+
+    def commitlock_resume(self):
+        waiting = self.__storage._waiting
         while waiting:
-            f, args = waiting.pop(0)
-            if apply(f,args): break
+            resume, args, onerror = waiting.pop(0)
+            try:
+                if apply(resume, args):
+                    break
+            except Disconnected:
+                # A disconnected error isn't an unexpected error.
+                # There should be no need to log it, because the
+                # disconnect will have generated its own log event.
+                onerror()
+            except:
+                LOG('ZEO Server', ERROR,
+                    "Unexpected error handling queued tpc_begin()",
+                    error=sys.exc_info())
+                onerror()
 
-        self._transaction=None
-        self.__invalidated=[]
+    def tpc_abort(self, id):
+        t = self._transaction
+        if t is None or id != t.id:
+            return
+        r = self.__storage.tpc_abort(t)
+
+        self._transaction = None
+        self.__invalidated = []
+        self.commitlock_resume()
         
     def unlock(self):
-        if self.__closed: return
+        if self.__closed:
+            return
         self.message_output('UN.')
 
     def tpc_begin(self, id, user, description, ext):
-        t=self._transaction
+        t = self._transaction
         if t is not None:
-            if id == t.id: return
+            if id == t.id:
+                return
             else:
                 raise StorageServerError(
                     "Multiple simultaneous tpc_begin requests from the same "
                     "client."
                     )
-        storage=self.__storage
+        storage = self.__storage
         if storage._transaction is not None:
-            try: waiting=storage.__waiting
-            except: waiting=storage.__waiting=[]
-            waiting.append((self.unlock, ()))
+            self.commitlock_suspend(self.unlock, (), self.close)
             return 1 # Return a flag indicating a lock condition.
             
         self._transaction=t=Transaction()
@@ -552,9 +593,9 @@
         if storage._transaction is None:
             self.try_again_sync(id, user, description, ext)
         else:
-            try: waiting=storage.__waiting
-            except: waiting=storage.__waiting=[]
-            waiting.append((self.try_again_sync, (id, user, description, ext)))
+            self.commitlock_suspend(self.try_again_sync,
+                                    (id, user, description, ext),
+                                    self.close)
 
         return _noreturn
         
@@ -572,24 +613,21 @@
         return 1
 
     def tpc_finish(self, id, user, description, ext):
-        t=self._transaction
-        if id != t.id: return
+        t = self._transaction
+        if id != t.id:
+            return
 
-        storage=self.__storage
-        r=storage.tpc_finish(t)
-        
-        try: waiting=storage.__waiting
-        except: waiting=storage.__waiting=[]
-        while waiting:
-            f, args = waiting.pop(0)
-            if apply(f,args): break
+        storage = self.__storage
+        r = storage.tpc_finish(t)
 
-        self._transaction=None
+        self._transaction = None
         if self.__invalidated:
             self.__server.invalidate(self, self.__storage_id,
                                      self.__invalidated,
                                      self.get_size_info())
-            self.__invalidated=[]
+            self.__invalidated = []
+            
+        self.commitlock_resume()
 
 def init_storage(storage):
     if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None