[Zope-CVS] CVS: Packages/zasync/client/zasync - client.py:1.4 plugins.py:1.3

Gary Poster gary at zope.com
Thu Oct 21 15:30:00 EDT 2004


Update of /cvs-repository/Packages/zasync/client/zasync
In directory cvs.zope.org:/tmp/cvs-serv6303/client/zasync

Modified Files:
	client.py plugins.py 
Log Message:
fix bug: if tool disappeared, then zasync was unable to recover because of a number of problems.  most importantly, it was holding on to connections without closing them.  Fixed this bug and similar problems elsewhere.

More paranoid about starting up now, and more paranoid about telling the manager about the available plugins when the tool has been re-found.



=== Packages/zasync/client/zasync/client.py 1.3 => 1.4 ===
--- Packages/zasync/client/zasync/client.py:1.3	Tue Oct 19 12:45:44 2004
+++ Packages/zasync/client/zasync/client.py	Thu Oct 21 15:29:59 2004
@@ -74,11 +74,14 @@
 # helpers
 
 def housekeeping(interval=4):
+    global chores
+    log = logging.getLogger('zasync')
+    log.debug('housekeeping: performing %d chores', len(chores))
     for (f, args, kwargs) in chores:
         reactor.callLater(0, f, *args, **kwargs)
     reactor.callLater(interval, housekeeping, interval)
 
-def getRequestApp():
+def getRequestApp(application):
     response = HTTPResponse(stdout=sys.stdout)
     request = HTTPRequest(
         sys.stdin,
@@ -86,7 +89,7 @@
          'SERVER_PORT':'80',
          'REQUEST_METHOD':'GET'},
         response)
-    return app().__of__(RequestContainer(REQUEST=request))
+    return application.__of__(RequestContainer(REQUEST=request))
 
 def logException(msg='', log=None):
     global verbose_traceback
@@ -168,12 +171,31 @@
 
 def retryTool(path, delay):
     global retry_exponential_backoff, max_total_retry
-    global tool_retry_start, app
-    tool = None
+    global tool_retry_start, app, DB
+    application = None
     try:
         try:
-            tool = app().unrestrictedTraverse(path)
+            try:
+                sync = DB._storage.sync # important
+            except AttributeError:
+                pass
+            else:
+                sync()
+            application = app()
+            tool = application.unrestrictedTraverse(path)
+            get_transaction().commit()
+        except ConflictError:
+            logging.getLogger('zasync').info(
+                'Received ConflictError while trying to retry tool; retrying.')
+            get_transaction().abort()
+            reactor.callLater(delay, retryTool, path, delay)
+        except ClientDisconnected:
+            logging.getLogger('zasync').critical(
+                'ZEO server disconnected while trying to retry tool')
+            get_transaction().abort()
+            scheduleServerRetry(retryTool, path, delay)
         except (AttributeError, LookupError):
+            get_transaction().abort()
             delay = pow(delay, retry_exponential_backoff)
             diff = datetime.datetime.now() - tool_retry_start
             seconds = diff.seconds + 86400 * diff.days
@@ -193,14 +215,21 @@
             retries = tool_retries.pop(path)
             delay = 0
             interval = 0.25
+            # schedule all the past calls
+            found = False
             for call, args, kwargs in retries:
                 reactor.callLater(delay, call, *args, **kwargs)
                 delay += interval
+                if not found and call is _setPlugins:
+                    found = True
+            if not found:
+                # make sure tool (possibly new) knows about my plugins
+                setPlugins(path) # returns a deferred, which we ignore
             logging.getLogger('zasync').info(
                 'tool /%s found again; recommencing calls.', '/'.join(path))
     finally:
-        if tool is not None:
-            tool._p_jar.close()
+        if application is not None:
+            application._p_jar.close()
 
 def timeoutErrback(deferred):
     if not deferred.called:
@@ -212,11 +241,11 @@
     return value
 
 # pollZope schedules makeCall which schedules returnResult, possibly using
-# prepareDeferredCall if the result is a deferred.
+# prepareDeferredCall if the result is a deferred (and it usually is).
 def pollZope(path):
     "polls Zope to check for new calls, timeout changes, and expired calls"
     global active, app, DB
-    tool = None
+    application = None
     try:
         if is_connected():
             log = logging.getLogger('zasync')
@@ -228,8 +257,9 @@
                     pass
                 else:
                     sync()
+                application = app()
                 try:
-                    tool = app().unrestrictedTraverse(path)
+                    tool = application.unrestrictedTraverse(path)
                 except (AttributeError, LookupError):
                     scheduleToolRetry(path, pollZope, path)
                     return
@@ -281,8 +311,8 @@
         else:
             scheduleServerRetry(pollZope, path)
     finally:
-        if tool is not None:
-            tool._p_jar.close()
+        if application is not None:
+            application._p_jar.close()
 
 def makeCall(path, zopeDeferredId, name, args, kwargs, count=0):
     # zopeDeferredId is an id of a deferred in the asynchronous tool
@@ -346,12 +376,13 @@
     log = logging.getLogger('zasync')
     log.debug('prepareDeferredCall called for %s (%s)',
         zopeDeferredId, path)
-    tool = None
+    application = None
     try:
         if is_connected():
             try:
+                application = app()
                 try:
-                    tool = app().unrestrictedTraverse(path)
+                    tool = application.unrestrictedTraverse(path)
                 except (AttributeError, LookupError):
                     scheduleToolRetry(
                         path, prepareDeferredCall, path, zopeDeferredId, 
@@ -405,11 +436,11 @@
             scheduleServerRetry(
                 makeCall, path, zopeDeferredId, name, args, kwargs, count)
     finally:
-        if tool is not None:
-            tool._p_jar.close()
+        if application is not None:
+            application._p_jar.close()
 
 def returnResult(value, path, zopeDeferredId, error=False, count=0):
-    global active
+    global active, app
     if isinstance(value, failure.Failure):
         value.cleanFailure()
     log = logging.getLogger('zasync')
@@ -419,12 +450,13 @@
         del active[zopeDeferredId]
     except KeyError:
         pass
-    tool = None
+    application = None
     try:
         if is_connected():
             try:
+                application = app()
                 try:
-                    tool = getRequestApp().unrestrictedTraverse(path)
+                    tool = getRequestApp(application).unrestrictedTraverse(path)
                 except (AttributeError, LookupError):
                     scheduleToolRetry(
                         path, returnResult, value, path, zopeDeferredId, 
@@ -467,63 +499,155 @@
             scheduleServerRetry(
                 returnResult, value, path, zopeDeferredId, error, count)
     finally:
-        if tool is not None:
-            tool._p_jar.close()
+        if application is not None:
+            application._p_jar.close()
     return res
 
 def run(path=None):
     # to be called after config.initialize
-    global app, tool_path, plugins
+    d = setPlugins(path)
+    d.addCallbacks(handlePastCalls, stop)
+    d.addErrback(stop)
+    log = logging.getLogger('zasync')
+    log.debug('beginning reactor')
+    try:
+        reactor.run()
+    finally:
+        shutdown(None)
+
+def stop(ignored=None):
+    raise SystemExit()
+
+def setPlugins(path=None):
+    global tool_path
     if path is None:
         path = tool_path
-    path = tuple(path)
-    tool = app().unrestrictedTraverse(tool_path)
-    tool.setPlugins([(n, p['description']) for n, p in plugins.items()])
-    for attempt in range(max_conflict_resolution_attempts):
+    tool_path = path = tuple(path)
+    d = defer.Deferred()
+    reactor.callLater(0, _setPlugins, d, path)
+    return d
+
+def _setPlugins(deferred, path, count=0):
+    global app, plugins, max_conflict_resolution_attempts
+    log = logging.getLogger('zasync')
+    application = None
+    try:
         try:
-            info = [(d.getSignature(), d.key) for d in 
-                     tool.getAcceptedCalls()]
-            get_transaction().commit()
+            application = app()
+            try:
+                tool = application.unrestrictedTraverse(tool_path)
+            except (AttributeError, LookupError):
+                scheduleToolRetry(path, _setPlugins, deferred, path, count)
+                return
+            else:
+                tool.setPlugins(
+                    [(n, p['description']) for n, p in plugins.items()])
+                get_transaction().commit()
         except ConflictError:
-            pass
+            get_transaction().abort()
+            if count == max_conflict_resolution_attempts-1:
+                log.critical(
+                    "Too many conflicts trying to setPlugins!", exc_info=True)
+                deferred.errback(failure.Failure())
+            else:
+                count += 1
+                log.info("Conflict error %d trying to setPlugins", count)
+                reactor.callLater(count, _setPlugins, deferred, path, count)
+        except ClientDisconnected:
+            scheduleServerRetry(_setPlugins, deferred, path, count)
+            return
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            get_transaction().abort()
+            log.critical("Unexpected error: probably serious", exc_info=True)
+            deferred.errback(failure.Failure())
         else:
-            break
-    else:
-        raise RuntimeError("Too many conflicts")
-    for (name, args, kwargs), key in info:
+            deferred.callback(path)
+    finally:
+        if application is not None:
+            application._p_jar.close()
+
+def handlePastCalls(path, count=0):
+    global app
+    log = logging.getLogger('zasync')
+    application = None
+    try:
         try:
-            call_info = plugins[name]
-        except KeyError:
-            retry = False
-        else:
-            retry = call_info.get('retry', False)
-        if retry: # XXX don't retry if timed out
-            reactor.callLater(
-                0, makeCall, tool_path, key, name, args, kwargs)
+            application = app()
+            try:
+                tool = application.unrestrictedTraverse(path)
+            except (AttributeError, LookupError):
+                scheduleToolRetry(path, handlePastCalls, path, count)
+                return
+            else:
+                info = [(d.getSignature(), d.key) for d in 
+                         tool.getAcceptedCalls()]
+                get_transaction().commit()
+        except ConflictError:
+            get_transaction().abort()
+            if count==max_conflict_resolution_attempts-1:
+                log.critical(
+                    "Too many conflicts trying to handlePastCalls!", 
+                    exc_info=True)
+                raise SystemExit("Too many conflicts trying to handlePastCalls")
+            else:
+                count+=1
+                log.info("Conflict error %d trying to handlePastCalls", count)
+                reactor.callLater(count, handlePastCalls, path, count)
+        except ClientDisconnected:
+            scheduleServerRetry(handlePastCalls, path, count)
+            return
+        except (KeyboardInterrupt, SystemExit):
+            raise
+        except:
+            get_transaction().abort()
+            log.critical("Unexpected error: probably serious", exc_info=True)
+            deferred.errback(failure.Failure())
         else:
-            reactor.callLater(
-                0,
-                returnResult, 
-                failure.Failure(
-                    defer.TimeoutError(
-                        'zasync was disconnected (now reconnected)')),
-                tool_path,
-                key,
-                error=True)
-    tool._p_jar.close()
-    reactor.callLater(0, pollZope, tool_path)
-    reactor.callLater(0, housekeeping)
+            for (name, args, kwargs), key in info:
+                try:
+                    call_info = plugins[name]
+                except KeyError:
+                    retry = False
+                else:
+                    retry = call_info.get('retry', False)
+                if retry: # XXX don't retry if timed out
+                    reactor.callLater(
+                        0, makeCall, tool_path, key, name, args, kwargs)
+                else:
+                    reactor.callLater(
+                        0,
+                        returnResult, 
+                        failure.Failure(
+                            defer.TimeoutError(
+                                'zasync was disconnected (now reconnected)')),
+                        tool_path,
+                        key,
+                        error=True)
+            log.debug('scheduling first pollZope')
+            reactor.callLater(0, pollZope, path)
+            log.debug('scheduling first housekeeping')
+            reactor.callLater(0, housekeeping)
+    finally:
+        if application is not None:
+            application._p_jar.close()
+
+def shutdown(ignored=None):
+    global tool_path, app
+    application = None
     try:
-        reactor.run()
-    finally: 
         get_transaction().abort()
+        application = app()
         try:
-            tool = app().unrestrictedTraverse(tool_path)
-        except (AttributeError, KeyError, ConflictError, ClientDisconnected):
-            pass
-        else:
+            tool = application.unrestrictedTraverse(tool_path)
             tool.setPlugins(())
             get_transaction().commit()
-            tool._p_jar.close()
+        except (AttributeError, KeyError, ConflictError, ClientDisconnected):
+            get_transaction().abort()
+    finally:
+        if application is not None:
+            application._p_jar.close()
         logging.getLogger('zasync').critical('Shutting down')
         logging.shutdown()
+    return ignored


=== Packages/zasync/client/zasync/plugins.py 1.2 => 1.3 ===
--- Packages/zasync/client/zasync/plugins.py:1.2	Tue Oct 19 12:51:38 2004
+++ Packages/zasync/client/zasync/plugins.py	Thu Oct 21 15:29:59 2004
@@ -374,7 +374,7 @@
         'zope_exec: beginning worker thread %r', thread_id)
     global taskQueue, taskStatus, threadIds, callbacks, serverDown
     from zasync.client import app, max_conflict_resolution_attempts
-    root = None
+    application = None
     try:
         while 1: # keep on looking for tasks
             zopeDeferredTuple, homepath, actions = taskQueue.get() # blocks
@@ -391,7 +391,14 @@
                 try:
                     try:
                         deferred = None
-                        home = root = client.getRequestApp()
+                        try:
+                            sync = client.DB._storage.sync # important
+                        except AttributeError:
+                            pass
+                        else:
+                            sync()
+                        application = client.app()
+                        home = root = client.getRequestApp(application)
                         tool = root.unrestrictedTraverse(toolpath)
                         zopeDeferred = tool.getDeferred(zopeDeferredId)
                         user = zopeDeferred.getWrappedOwner()
@@ -607,8 +614,9 @@
                             callbacks.put((deferred, fail))
                         break
                 finally:
-                    if root is not None:
-                        root._p_jar.close()
+                    if application is not None:
+                        application._p_jar.close()
+                        application = None
             else: # too many conflict resolution attempts
                 logger.debug(
                     'zope_exec: worker %s got too many conflict errors: '



More information about the Zope-CVS mailing list