[Zodb-checkins] SVN: ZODB/trunk/src/ ZEO clients (``ClientStorage`` instances) now work in forked processes,

Jim Fulton jim at zope.com
Fri Jan 29 17:47:23 EST 2010


Log message for revision 108653:
  ZEO clients (``ClientStorage`` instances) now work in forked processes,
  including those created via ``multiprocessing.Process`` instances.
  
  This entailed giving each client storage it's own networking thread.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZEO/zrpc/client.py
  U   ZODB/trunk/src/ZEO/zrpc/connection.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/CHANGES.txt	2010-01-29 22:47:22 UTC (rev 108653)
@@ -18,6 +18,9 @@
   raise a StorageTransactionError when invalid transactions are passed
   to tpc_begin, tpc_vote, or tpc_finish.
 
+- ZEO clients (``ClientStorage`` instances) now work in forked processes,
+  including those created via ``multiprocessing.Process`` instances.
+
 - Broken objects now provide the IBroken interface.
 
 Bugs Fixed

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2010-01-29 22:47:22 UTC (rev 108653)
@@ -52,8 +52,6 @@
 
 logger = logging.getLogger('ZEO.tests.testZEO')
 
-ZEO.zrpc.connection.start_client_thread()
-
 class DummyDB:
     def invalidate(self, *args):
         pass
@@ -389,14 +387,17 @@
 
     def setUp(self):
         # Crank down the select frequency
-        self.__old_client_timeout = ZEO.zrpc.connection.client_timeout
-        ZEO.zrpc.connection.client_timeout = 0.1
-        ZEO.zrpc.connection.client_trigger.pull_trigger()
+        self.__old_client_timeout = ZEO.zrpc.client.client_timeout
+        ZEO.zrpc.client.client_timeout = self.__client_timeout
         ZEO.tests.ConnectionTests.CommonSetupTearDown.setUp(self)
 
+    __client_timeouts = 0
+    def __client_timeout(self):
+        self.__client_timeouts += 1
+        return .1
+
     def tearDown(self):
-        ZEO.zrpc.connection.client_timeout = self.__old_client_timeout
-        ZEO.zrpc.connection.client_trigger.pull_trigger()
+        ZEO.zrpc.client.client_timeout = self.__old_client_timeout
         ZEO.tests.ConnectionTests.CommonSetupTearDown.tearDown(self)
 
     def getConfig(self, path, create, read_only):
@@ -405,11 +406,11 @@
     def checkHeartbeatWithServerClose(self):
         # This is a minimal test that mainly tests that the heartbeat
         # function does no harm.
-        client_timeout_count = ZEO.zrpc.connection.client_timeout_count
         self._storage = self.openClientStorage()
-        time.sleep(1) # allow some time for the select loop to fire a few times
-        self.assert_(ZEO.zrpc.connection.client_timeout_count
-                     > client_timeout_count)
+        client_timeouts = self.__client_timeouts
+        forker.wait_until('got a timeout',
+                          lambda : self.__client_timeouts > client_timeouts
+                          )
         self._dostore()
 
         if hasattr(os, 'kill'):
@@ -419,23 +420,10 @@
         else:
             self.shutdownServer()
 
-        for i in range(91):
-            # wait for disconnection
-            if not self._storage.is_connected():
-                break
-            time.sleep(0.1)
-        else:
-            raise AssertionError("Didn't detect server shutdown in 5 seconds")
-
-    def checkHeartbeatWithClientClose(self):
-        # This is a minimal test that mainly tests that the heartbeat
-        # function does no harm.
-        client_timeout_count = ZEO.zrpc.connection.client_timeout_count
-        self._storage = self.openClientStorage()
+        forker.wait_until('disconnected',
+                          lambda : not self._storage.is_connected()
+                          )
         self._storage.close()
-        time.sleep(1) # allow some time for the select loop to fire a few times
-        self.assert_(ZEO.zrpc.connection.client_timeout_count
-                     > client_timeout_count)
 
 
 class ZRPCConnectionTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
@@ -451,27 +439,27 @@
             def writable(self):
                 raise SystemError("I'm evil")
 
-        log = []
-        ZEO.zrpc.connection.client_logger.critical = (
-            lambda m, *a, **kw: log.append((m % a, kw))
-            )
+        import zope.testing.loggingsupport
+        handler = zope.testing.loggingsupport.InstalledHandler(
+            'ZEO.zrpc.client')
 
-        ZEO.zrpc.connection.client_map[None] = Evil()
+        self._storage._rpc_mgr.map[None] = Evil()
 
         try:
-            ZEO.zrpc.connection.client_trigger.pull_trigger()
+            self._storage._rpc_mgr.trigger.pull_trigger()
         except DisconnectedError:
             pass
 
-        time.sleep(.1)
-        self.failIf(self._storage.is_connected())
-        self.assertEqual(len(ZEO.zrpc.connection.client_map), 1)
-        del ZEO.zrpc.connection.client_logger.critical
-        self.assertEqual(log[0][0], 'The ZEO client loop failed.')
-        self.assert_('exc_info' in log[0][1])
-        self.assertEqual(log[1][0], "Couldn't close a dispatcher.")
-        self.assert_('exc_info' in log[1][1])
+        forker.wait_until(
+            'disconnected',
+            lambda : not self._storage.is_connected()
+            )
 
+        log = str(handler)
+        handler.uninstall()
+        self.assert_("ZEO client loop failed" in log)
+        self.assert_("Couldn't close a dispatcher." in log)
+
     def checkExceptionLogsAtError(self):
         # Test the exceptions are logged at error
         self._storage = self.openClientStorage()
@@ -1201,9 +1189,12 @@
 
 def client_asyncore_thread_has_name():
     """
+    >>> addr, _ = start_server()
+    >>> db = ZEO.DB(addr)
     >>> len([t for t in threading.enumerate()
-    ...      if t.getName() == 'ZEO.zrpc.connection'])
+    ...      if ' zeo client networking thread' in t.getName()])
     1
+    >>> db.close()
     """
 
 def runzeo_without_configfile():
@@ -1260,6 +1251,37 @@
     >>> thread.join(1)
     """
 
+if sys.version_info >= (2, 6):
+    import multiprocessing
+
+    def work_with_multiprocessing_process(name, addr, q):
+        conn = ZEO.connection(addr)
+        q.put((name, conn.root.x))
+        conn.close()
+
+    def work_with_multiprocessing():
+        """Client storage should work with multi-processing.
+
+        >>> import StringIO
+        >>> sys.stdin = StringIO.StringIO()
+        >>> addr, _ = start_server()
+        >>> conn = ZEO.connection(addr)
+        >>> conn.root.x = 1
+        >>> transaction.commit()
+        >>> q = multiprocessing.Queue()
+        >>> processes = [multiprocessing.Process(
+        ...     target=work_with_multiprocessing_process,
+        ...     args=(i, addr, q))
+        ...     for i in range(3)]
+        >>> _ = [p.start() for p in processes]
+        >>> sorted(q.get(timeout=60) for p in processes)
+        [(0, 1), (1, 1), (2, 1)]
+
+        >>> _ = [p.join(30) for p in processes]
+        >>> conn.close()
+        """
+
+
 slow_test_classes = [
     BlobAdaptedFileStorageTests, BlobWritableCacheTests,
     DemoStorageTests, FileStorageTests, MappingStorageTests,

Modified: ZODB/trunk/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/client.py	2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/ZEO/zrpc/client.py	2010-01-29 22:47:22 UTC (rev 108653)
@@ -11,11 +11,15 @@
 # FOR A PARTICULAR PURPOSE
 #
 ##############################################################################
+import asyncore
 import errno
+import logging
 import select
 import socket
+import sys
 import threading
 import time
+import traceback
 import types
 import logging
 
@@ -24,15 +28,113 @@
 
 from ZEO.zrpc.log import log
 import ZEO.zrpc.trigger
-from ZEO.zrpc.connection import ManagedClientConnection, start_client_thread
+from ZEO.zrpc.connection import ManagedClientConnection
 
+def client_timeout():
+    return 30.0
+
+def client_loop(map):
+    read = asyncore.read
+    write = asyncore.write
+    _exception = asyncore._exception
+
+    while map:
+        try:
+
+            # The next two lines intentionally don't use
+            # iterators. Other threads can close dispatchers, causeing
+            # the socket map to shrink.
+            r = e = map.keys()
+            w = [fd for (fd, obj) in map.items() if obj.writable()]
+
+            try:
+                r, w, e = select.select(r, w, e, client_timeout())
+            except select.error, err:
+                if err[0] != errno.EINTR:
+                    if err[0] == errno.EBADF:
+
+                        # If a connection is closed while we are
+                        # calling select on it, we can get a bad
+                        # file-descriptor error.  We'll check for this
+                        # case by looking for entries in r and w that
+                        # are not in the socket map.
+
+                        if [fd for fd in r if fd not in map]:
+                            continue
+                        if [fd for fd in w if fd not in map]:
+                            continue
+
+                    raise
+                else:
+                    continue
+
+            if not map:
+                break
+
+            if not (r or w or e):
+                # The line intentionally doesn't use iterators. Other
+                # threads can close dispatchers, causeing the socket
+                # map to shrink.
+                for obj in map.values():
+                    if isinstance(obj, ManagedClientConnection):
+                        # Send a heartbeat message as a reply to a
+                        # non-existent message id.
+                        try:
+                            obj.send_reply(-1, None)
+                        except DisconnectedError:
+                            pass
+                continue
+
+            for fd in r:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                read(obj)
+
+            for fd in w:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                write(obj)
+
+            for fd in e:
+                obj = map.get(fd)
+                if obj is None:
+                    continue
+                _exception(obj)
+
+        except:
+            if map:
+                try:
+                    logging.getLogger(__name__+'.client_loop').critical(
+                        'A ZEO client loop failed.',
+                        exc_info=sys.exc_info())
+                except:
+                    pass
+
+                for fd, obj in map.items():
+                    if not hasattr(obj, 'mgr'):
+                        continue
+                    try:
+                        obj.mgr.client.close()
+                    except:
+                        map.pop(fd, None)
+                        try:
+                            logging.getLogger(__name__+'.client_loop'
+                                              ).critical(
+                                "Couldn't close a dispatcher.",
+                                exc_info=sys.exc_info())
+                        except:
+                            pass
+
+
 class ConnectionManager(object):
     """Keeps a connection up over time"""
 
     def __init__(self, addrs, client, tmin=1, tmax=180):
-        start_client_thread()
+        self.client = client
+        self._start_asyncore_loop()
         self.addrlist = self._parse_addrs(addrs)
-        self.client = client
         self.tmin = min(tmin, tmax)
         self.tmax = tmax
         self.cond = threading.Condition(threading.Lock())
@@ -42,6 +144,15 @@
         # attempting to connect.
         self.thread = None # Protected by self.cond
 
+    def _start_asyncore_loop(self):
+        self.map = {}
+        self.trigger = ZEO.zrpc.trigger.trigger(self.map)
+        self.loop_thread = threading.Thread(
+            name="%s zeo client networking thread" % self.client.__name__,
+            target=client_loop, args=(self.map,))
+        self.loop_thread.setDaemon(True)
+        self.loop_thread.start()
+
     def __repr__(self):
         return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
 
@@ -84,7 +195,6 @@
         try:
             t = self.thread
             self.thread = None
-            conn = self.connection
         finally:
             self.cond.release()
         if t is not None:
@@ -94,10 +204,22 @@
             if t.isAlive():
                 log("CM.close(): self.thread.join() timed out",
                     level=logging.WARNING)
-        if conn is not None:
-            # This will call close_conn() below which clears self.connection
-            conn.close()
 
+        for fd, obj in self.map.items():
+            if obj is not self.trigger:
+                try:
+                    obj.close()
+                except:
+                    logging.getLogger(__name__+'.'+self.__class__.__name__
+                                      ).critical(
+                        "Couldn't close a dispatcher.",
+                        exc_info=sys.exc_info())
+
+        self.map.clear()
+        self.trigger.pull_trigger()
+        self.loop_thread.join(9)
+        self.trigger.close()
+
     def attempt_connect(self):
         """Attempt a connection to the server without blocking too long.
 

Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py	2010-01-29 22:47:19 UTC (rev 108652)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py	2010-01-29 22:47:22 UTC (rev 108653)
@@ -21,10 +21,11 @@
 
 import traceback, time
 
+import ZEO.zrpc.trigger
+
 from ZEO.zrpc import smac
 from ZEO.zrpc.error import ZRPCError, DisconnectedError
 from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
-from ZEO.zrpc.trigger import trigger
 from ZEO.zrpc.log import short_repr, log
 from ZODB.loglevels import BLATHER, TRACE
 import ZODB.POSException
@@ -35,142 +36,6 @@
 
 debug_zrpc = False
 
-##############################################################################
-# Dedicated Client select loop:
-client_timeout = 30.0
-client_timeout_count = 0 # for testing
-client_map = {}
-client_trigger = trigger(client_map)
-client_logger = logging.getLogger('ZEO.zrpc.client_loop')
-client_exit_event = threading.Event()
-client_running = False
-def client_exit():
-    global client_running
-    if client_running:
-        client_running = False
-        client_trigger.pull_trigger()
-        client_exit_event.wait(99)
-
-atexit.register(client_exit)
-
-def client_loop():
-    global client_running
-    client_running = True
-    client_exit_event.clear()
-
-    map = client_map
-    read = asyncore.read
-    write = asyncore.write
-    _exception = asyncore._exception
-    loop_failures = 0
-
-    while client_running and map:
-        try:
-
-            # The next two lines intentionally don't use
-            # iterators. Other threads can close dispatchers, causeing
-            # the socket map to shrink.
-            r = e = client_map.keys()
-            w = [fd for (fd, obj) in map.items() if obj.writable()]
-
-            try:
-                r, w, e = select.select(r, w, e, client_timeout)
-            except select.error, err:
-                if err[0] != errno.EINTR:
-                    if err[0] == errno.EBADF:
-
-                        # If a connection is closed while we are
-                        # calling select on it, we can get a bad
-                        # file-descriptor error.  We'll check for this
-                        # case by looking for entries in r and w that
-                        # are not in the socket map.
-
-                        if [fd for fd in r if fd not in map]:
-                            continue
-                        if [fd for fd in w if fd not in map]:
-                            continue
-
-                    raise
-                else:
-                    continue
-
-            if not client_running:
-                break
-
-            if not (r or w or e):
-                # The line intentionally doesn't use iterators. Other
-                # threads can close dispatchers, causeing the socket
-                # map to shrink.
-                for obj in map.values():
-                    if isinstance(obj, Connection):
-                        # Send a heartbeat message as a reply to a
-                        # non-existent message id.
-                        try:
-                            obj.send_reply(-1, None)
-                        except DisconnectedError:
-                            pass
-                global client_timeout_count
-                client_timeout_count += 1
-                continue
-
-            for fd in r:
-                obj = map.get(fd)
-                if obj is None:
-                    continue
-                read(obj)
-
-            for fd in w:
-                obj = map.get(fd)
-                if obj is None:
-                    continue
-                write(obj)
-
-            for fd in e:
-                obj = map.get(fd)
-                if obj is None:
-                    continue
-                _exception(obj)
-
-        except:
-            if map:
-                try:
-                    client_logger.critical('The ZEO client loop failed.',
-                                           exc_info=sys.exc_info())
-                except:
-                    pass
-
-                for fd, obj in map.items():
-                    if obj is client_trigger:
-                        continue
-                    try:
-                        obj.mgr.client.close()
-                    except:
-                        map.pop(fd, None)
-                        try:
-                            client_logger.critical(
-                                "Couldn't close a dispatcher.",
-                                exc_info=sys.exc_info())
-                        except:
-                            pass
-
-    client_exit_event.set()
-
-client_thread_lock = threading.Lock()
-client_thread = None
-def start_client_thread():
-    client_thread_lock.acquire()
-    try:
-        global client_thread
-        if client_thread is None:
-            client_thread = threading.Thread(target=client_loop, name=__name__)
-            client_thread.setDaemon(True)
-            client_thread.start()
-    finally:
-        client_thread_lock.release()
-
-#
-##############################################################################
-
 class Delay:
     """Used to delay response to client for synchronous calls.
 
@@ -679,7 +544,7 @@
     unlogged_exception_types = (ZODB.POSException.POSKeyError, )
 
     # Servers use a shared server trigger that uses the asyncore socket map
-    trigger = trigger()
+    trigger = ZEO.zrpc.trigger.trigger()
     call_from_thread = trigger.pull_trigger
 
     def __init__(self, sock, addr, obj, mgr):
@@ -724,9 +589,6 @@
     __super_init = Connection.__init__
     base_message_output = Connection.message_output
 
-    trigger = client_trigger
-    call_from_thread = trigger.pull_trigger
-
     def __init__(self, sock, addr, mgr):
         self.mgr = mgr
 
@@ -753,8 +615,10 @@
         self.replies_cond = threading.Condition()
         self.replies = {}
 
-        self.__super_init(sock, addr, None, tag='C', map=client_map)
-        client_trigger.pull_trigger()
+        self.__super_init(sock, addr, None, tag='C', map=mgr.map)
+        self.trigger = mgr.trigger
+        self.call_from_thread = self.trigger.pull_trigger
+        self.call_from_thread()
 
     def close(self):
         Connection.close(self)



More information about the Zodb-checkins mailing list