[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.86

Jeremy Hylton jeremy@zope.com
Tue, 7 Jan 2003 17:13:29 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv27954/ZEO

Modified Files:
	StorageServer.py 
Log Message:
Add per-storage transaction timeout feature and a couple of tests.


=== ZODB3/ZEO/StorageServer.py 1.85 => 1.86 ===
--- ZODB3/ZEO/StorageServer.py:1.85	Tue Jan  7 14:24:41 2003
+++ ZODB3/ZEO/StorageServer.py	Tue Jan  7 17:12:57 2003
@@ -33,6 +33,7 @@
 from ZEO.CommitLog import CommitLog
 from ZEO.zrpc.server import Dispatcher
 from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from ZEO.zrpc.trigger import trigger
 
 import zLOG
 from ZODB.POSException import StorageError, StorageTransactionError
@@ -72,7 +73,8 @@
     ManagedServerConnectionClass = ManagedServerConnection
 
     def __init__(self, addr, storages, read_only=0,
-                 invalidation_queue_size=100):
+                 invalidation_queue_size=100,
+                 transaction_timeout=None):
         """StorageServer constructor.
 
         This is typically invoked from the start.py script.
@@ -104,6 +106,11 @@
             N == invalidation_queue_size.  This queue is used to
             speed client cache verification when a client disconnects
             for a short period of time.
+
+        transaction_timout -- The maximum amount of time to wait for
+            a transaction to commit after acquiring the storage lock.
+            If the transaction takes too long, the client connection
+            will be closed and the transaction aborted.
         """
 
         self.addr = addr
@@ -125,6 +132,15 @@
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection,
                                                reuse_addr=1)
+        self.timeouts = {}
+        for name in self.storages.keys():
+            if transaction_timeout is None:
+                # An object with no-op methods
+                timeout = StubTimeoutThread()
+            else:
+                timeout = TimeoutThread(transaction_timeout)
+                timeout.start()
+            self.timeouts[name] = timeout
 
     def new_connection(self, sock, addr):
         """Internal: factory to create a new connection.
@@ -147,11 +163,14 @@
         list of current connections for that storage; this information
         is needed to handle invalidation.  This function updates this
         dictionary.
+
+        Returns the timeout object for the appropriate storage.
         """
         l = self.connections.get(storage_id)
         if l is None:
             l = self.connections[storage_id] = []
         l.append(conn)
+        return self.timeouts[storage_id]
 
     def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
@@ -216,6 +235,7 @@
 
         This is only called from the test suite, AFAICT.
         """
+        self.timeout.stop()
         self.dispatcher.close()
         for storage in self.storages.values():
             storage.close()
@@ -246,6 +266,7 @@
 
     def __init__(self, server, read_only=0):
         self.server = server
+        self.timeout = None
         self.connection = None
         self.client = None
         self.storage = None
@@ -350,7 +371,7 @@
         self.storage_id = storage_id
         self.storage = storage
         self.setup_delegation()
-        self.server.register_connection(storage_id, self)
+        self.timeout = self.server.register_connection(storage_id, self)
 
     def get_info(self):
         return {'length': len(self.storage),
@@ -512,6 +533,7 @@
                                    self.invalidated, self.get_size_info())
         self.transaction = None
         self.locked = 0
+        self.timeout.end(self)
         # Return the tid, for cache invalidation optimization
         self._handle_waiting()
         return tid
@@ -523,6 +545,7 @@
             self.storage.tpc_abort(self.transaction)
         self.transaction = None
         self.locked = 0
+        self.timeout.end(self)
         self._handle_waiting()
 
     def _abort(self):
@@ -584,6 +607,7 @@
     def _tpc_begin(self, txn, tid, status):
         self.locked = 1
         self.storage.tpc_begin(txn, tid, status)
+        self.timeout.begin(self)
 
     def _store(self, oid, serial, data, version):
         try:
@@ -701,6 +725,86 @@
             return 0
         else:
             return 1
+
+class StubTimeoutThread:
+
+    def begin(self, client):
+        pass
+
+    def end(self, client):
+        pass
+
+    def stop(self):
+        pass
+
+class TimeoutThread(threading.Thread):
+    """Monitors transaction progress and generates timeouts."""
+
+    def __init__(self, timeout):
+        threading.Thread.__init__(self)
+        self.setDaemon(1)
+        self._timeout = timeout
+        self._client = None
+        self._deadline = None
+        self._stop = 0
+        self._active = threading.Event()
+        self._lock = threading.Lock()
+        self._trigger = trigger()
+
+    def stop(self):
+        self._stop = 1
+
+    def begin(self, client):
+        self._lock.acquire()
+        try:
+            self._active.set()
+            self._client = client
+            self._deadline = time.time() + self._timeout
+        finally:
+            self._lock.release()
+
+    def end(self, client):
+        # The ZEOStorage will call this message for every aborted
+        # transaction, regardless of whether the transaction started
+        # the 2PC.  Ignore here if 2PC never began.
+        if client is not self._client:
+            return
+        self._lock.acquire()
+        try:
+            self._active.clear()
+            self._client = None
+            self._deadline = None
+        finally:
+            self._lock.release()
+
+    def run(self):
+        while not self._stop:
+            self._active.wait()
+            self._lock.acquire()
+            try:
+                howlong = self._deadline - time.time()
+            finally:
+                self._lock.release()
+            if howlong <= 0:
+                self.timeout()
+            else:
+                time.sleep(howlong)
+
+    def timeout(self):
+        self._lock.acquire()
+        try:
+            client = self._client
+            deadline = self._deadline
+            self._active.clear()
+            self._client = None
+            self._deadline = None
+        finally:
+            self._lock.release()
+        if client is None:
+            return
+        elapsed = time.time() - (deadline - self._timeout)
+        client.log("Transaction timeout after %d seconds" % int(elapsed))
+        self._trigger.pull_trigger(lambda: client.connection.close())
 
 def run_in_thread(method, *args):
     t = SlowMethodThread(method, args)