[Zodb-checkins] CVS: ZODB3/ZEO/zrpc - client.py:1.8

Guido van Rossum guido@python.org
Wed, 11 Sep 2002 15:20:51 -0400


Update of /cvs-repository/ZODB3/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv24692

Modified Files:
	client.py 
Log Message:
*Properly* protect self.thread (renamed from self._thread) with a
lock.  The lock is now never held for a long time, only while
self.thread is being assigned or inspected, and while a thread is
being created, started or stopped.  Waiting for an event or joining a
thread is now done without holding the lock.  There is no longer a
fear of AttributeError.

When a thread join doesn't succeed within 30 seconds, a message is
logged, and depending on the situation, the join is retried.


=== ZODB3/ZEO/zrpc/client.py 1.7 => 1.8 ===
--- ZODB3/ZEO/zrpc/client.py:1.7	Tue Sep 10 18:08:44 2002
+++ ZODB3/ZEO/zrpc/client.py	Wed Sep 11 15:20:50 2002
@@ -37,10 +37,10 @@
         self.connected = 0
         self.connection = None
         self.closed = 0
-        # If _thread is not None, then there is a helper thread
-        # attempting to connect.  _thread is protected by _connect_lock.
-        self._thread = None
-        self._connect_lock = threading.Lock()
+        # If thread is not None, then there is a helper thread
+        # attempting to connect.  thread is protected by thread_lock.
+        self.thread = None
+        self.thread_lock = threading.Lock()
         self.trigger = None
         self.thr_async = 0
         ThreadedAsync.register_loop_callback(self.set_async)
@@ -82,15 +82,17 @@
     def close(self):
         """Prevent ConnectionManager from opening new connections"""
         self.closed = 1
-        self._connect_lock.acquire()
+        self.thread_lock.acquire()
         try:
-            if self._thread is not None:
-                # XXX race on _thread
-                self._thread.stop()
-                self._thread.join(30)
-                assert not self._thread.isAlive()
+            t = self.thread
+            if t is not None:
+                t.stop()
         finally:
-            self._connect_lock.release()
+            self.thread_lock.release()
+        if t is not None:
+            t.join(30)
+            if t.isAlive():
+                log("ConnectionManager.close(): self.thread.join() timed out")
         if self.connection:
             self.connection.close()
         if self.trigger is not None:
@@ -126,41 +128,45 @@
 
         # XXX will a single attempt take too long?
         self.connect()
+        self.thread_lock.acquire()
         try:
-            event = self._thread.one_attempt
-        except AttributeError:
-            # An AttributeError means that (1) _thread is None and (2)
-            # as a consquence of (1) that the connect thread has
-            # already exited.
-            pass
-        else:
+            t = self.thread
+        finally:
+            self.thread_lock.release()
+        if t is not None:
+            event = t.one_attempt
             event.wait()
         return self.connected
 
     def connect(self, sync=0):
         if self.connected == 1:
             return
-        self._connect_lock.acquire()
+        self.thread_lock.acquire()
         try:
-            if self._thread is None:
+            t = self.thread
+            if t is None:
                 log("starting thread to connect to server")
-                self._thread = ConnectThread(self, self.client, self.addr,
-                                             self.tmin, self.tmax)
-                self._thread.start()
-            if sync:
-                try:
-                    self._thread.join()
-                except AttributeError:
-                    # probably means the thread exited quickly
-                    pass
+                self.thread = t = ConnectThread(self, self.client, self.addr,
+                                                self.tmin, self.tmax)
+                t.start()
         finally:
-            self._connect_lock.release()
+            self.thread_lock.release()
+        if sync:
+            t.join(30)
+            while t.isAlive():
+                log("ConnectionManager.connect(sync=1): "
+                    "self.thread.join() timed out")
+                t.join(30)
 
     def connect_done(self, c):
         log("connect_done()")
         self.connected = 1
         self.connection = c
-        self._thread = None
+        self.thread_lock.acquire()
+        try:
+            self.thread = None
+        finally:
+            self.thread_lock.release()
 
     def notify_closed(self):
         self.connected = 0