[Zope-CVS] CVS: Products/Scheduler - Task.py:1.1 Scheduler.py:1.12

Chris McDonough chrism@zope.com
Sat, 17 May 2003 14:46:38 -0400


Update of /cvs-repository/Products/Scheduler
In directory cvs.zope.org:/tmp/cvs-serv11728

Modified Files:
	Scheduler.py 
Added Files:
	Task.py 
Log Message:
Changes which allow the descheduling of a task by its task id rather than by its time.  Now, when a task is scheduled, it is assigned a task id (available as the taskid attribute of the task).  The task must now be descheduled using its task id rather than its time.  A tag was made before this change named "before_taskid" in CVS in case changes to the HEAD cause breakages in dependent code.


=== Added File Products/Scheduler/Task.py ===
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
# 
##############################################################################
"""
Task module.

$Id: Task.py,v 1.1 2003/05/17 18:46:07 chrism Exp $
"""

import time
import sys

import Globals
from AccessControl import ClassSecurityInfo
from Acquisition import Implicit
from Persistence import Persistent
from zLOG import LOG, INFO, BLATHER, ERROR, WARNING, PROBLEM
from ZPublisher.mapply import mapply
from ZPublisher.Publish import call_object, missing_name, dont_publish_class

from Products.Scheduler import IScheduledEvent

class Task(Implicit, Persistent):

    security = ClassSecurityInfo()
    security.declareObjectPublic()
    security.setDefaultAccess("allow")

    __implements__ = IScheduledEvent

    def __init__(self, description, when, path, interval=None, max_age=600,
                 max_retries=5, retry_backoff_time=60, filter_data=None):
        self.description = description
        self.when = when
        self.path = path
        self.interval = interval
        self.max_age = max_age
        self.max_retries = max_retries
        self.retry_backoff_time = retry_backoff_time
        self.retries = 0
        self.filter_data = filter_data

    def __call__(self):
        """Run the method on self.path with REQUEST['args'].

        On failure we reschedule, iff max_age and max_retries have not been
        exceeded."""

        now = int(time.time())
        # secondary max_age guard (in case of clock failure)
        if self.max_age and (now > self.when + self.max_age):
            msg = ('Task "%s" scheduled for %s failed due to exceeded maximum '
                   'age - not rescheduling'
                   % (self.getDescription(), pretty_time(self.when)))
            LOG('Scheduler', ERROR, msg)
            return 1

        try:

            REQUEST = getattr(self, 'REQUEST')
            result = self._runMethod(REQUEST)

        except:

            exc_name, exc_msg, exc_tb = sys.exc_info()
            synopsis = "'%s', %s" % (exc_name, exc_msg)
            exc_name = exc_msg = exc_tb = None

            # retry guard
            if self.max_retries and ((self.retries + 1) > self.max_retries):
                msg = ('Task "%s" scheduled for %s failed on exception'
                       '- not rescheduling due to exceeded maximum retries.'
                       '  Exception: %s'
                       % (self.description, pretty_time(self.when), synopsis))
                LOG('Scheduler', ERROR, msg, error=sys.exc_info())
                return 1
            
            then = now + self.retry_backoff_time

            # primary max_age guard
            if self.max_age and (then > self.when + self.max_age):
                msg = ('Task "%s" scheduled for %s failed on exception'
                       ' - not rescheduling due to exceeded maximum age'
                       '  Exception: %s'
                       % (self.description, pretty_time(self.when), synopsis))
                LOG('Scheduler', ERROR, msg, error=sys.exc_info())
                return 1

            # we schedule a retry if we get to here
            msg = ('Task "%s" scheduled for %s was not completed due '
                   'to an exception - retrying at %s.'
                   '  Exception: %s.'
                   % (self.description, pretty_time(self.when),
                      pretty_time(then), synopsis))
            LOG('Scheduler', PROBLEM, msg, error=sys.exc_info())
            self.retries = self.retries + 1
            return then, self

        # If we get here, we log success and return None
        msg = ('Task %s was run successfully with result %s' %
               (self.description, result))
        LOG('Scheduler', BLATHER, msg)
        return None

    def next(self):
        # having an interval means the task is recurring
        if self.interval:
            now = int(time.time())
            next_time = self.when + self.interval
            skipped = 0
            while next_time < now:
                # catch up!
                skipped  = skipped + 1
                next_time = next_time + self.interval
            if skipped:
                # report that we've lost some task runs
                msg = ('Task "%s" was skipped %s times due to a clock '
                       'failure or task server downtime'  % (self.description,
                                                              skipped))
                LOG('Scheduler', BLATHER, msg)
            return next_time, self.clone(next_time)
        return None # we do not have an interval

    def info(self):
        return ""

    def getDescription(self):
        return self.description

    def getInfo(self):
        return ''

    def getTime(self):
        return self.when

    def getFilterData(self):
        return self.filter_data

    def _runMethod(self, request):
        args = getattr(request, 'args', {})
        method = self.restrictedTraverse(self.path)
        result = mapply(method, args, request,
                        call_object, 1,
                        missing_name,
                        dont_publish_class,
                        request, bind=1)
        return result

    def clone(self, when):
        LOG('Scheduler', BLATHER, 'clone called')
        return self.__class__(description=self.description,
                              when=when,
                              path=self.path,
                              interval=self.interval,
                              max_age=self.max_age,
                              max_retries=self.max_retries,
                              retry_backoff_time=self.retry_backoff_time,
                              filter_data=self.filter_data)

def pretty_time(t):
    return time.ctime(t)

Globals.InitializeClass(Task)


=== Products/Scheduler/Scheduler.py 1.11 => 1.12 ===
--- Products/Scheduler/Scheduler.py:1.11	Tue May 13 19:32:21 2003
+++ Products/Scheduler/Scheduler.py	Sat May 17 14:46:07 2003
@@ -15,21 +15,18 @@
 
 $Id$
 """
-import os, time, sys
-from types import FloatType, IntType
+import os, time, sys, random
+from types import FloatType, IntType, StringType
 
 import Globals
-from BTrees import IOBTree
+from BTrees import IOBTree, OOBTree
 from Persistence import Persistent
 from ExtensionClass import Base
-from Acquisition import Implicit
 from Acquisition import aq_base
 from AccessControl import ClassSecurityInfo
 
 from OFS.SimpleItem import SimpleItem
 from OFS.PropertyManager import PropertyManager
-from ZPublisher.mapply import mapply
-from ZPublisher.Publish import call_object, missing_name, dont_publish_class
 
 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
 from Products.Event.ISubscriptionAware import ISubscriptionAware
@@ -39,6 +36,7 @@
 from Products.Scheduler import IScheduledEvent
 from Products.Scheduler import IDescheduledEvent
 from Products.Scheduler import ITimeEvent
+from Products.Scheduler.Task import Task
 from SchedulerPermissions import *
 
 from zLOG import LOG, PROBLEM, ERROR, BLATHER
@@ -79,7 +77,8 @@
         self.id = id
         self.title = title
         self.subscribed = []
-        self.tasks = IOBTree.IOBTree()
+        self.tasks = OOBTree.OOBTree()
+        self.times = IOBTree.IOBTree()
         if filter_data:
             self.filter_data = filter_data
         else:
@@ -95,7 +94,7 @@
         event_type and filter match the arguments provided to
         ISubscribable.subscribe.
         """
-        subscribable = getattr(subscribable, 'aq_base', subscribable)
+        subscribable = aq_base(subscribable)
         self.subscribed.append((subscribable, event_type, filter))
         self._p_changed = 1
     
@@ -106,7 +105,7 @@
         the deleted subscription, rather than, necessarily, the
         arguments provided to ISubscribable.unsubscribe.
         """
-        subscribable = getattr(subscribable, 'aq_base', subscribable)
+        subscribable = aq_base(subscribable)
         self.subscribed.remove((subscribable, event_type, filter))
         self._p_changed = 1
 
@@ -178,7 +177,7 @@
         elif IDescheduledEvent.isImplementedBy(event):
             # this is a deschedule event, we want to deschedule this event,
             # but we don't want to actually perform any tasks.
-            self.deschedule(event.getTime())
+            self.deschedule(event.getTaskId())
             return
         else:
             # We don't know what to do, so we punt.
@@ -189,7 +188,8 @@
                 'value.' % event)
 
         for this_time, this_task in self.getPendingTasks(t):
-            self.deschedule(this_time) # deschedule the task
+            taskid = this_task.taskid
+            self.deschedule(taskid) # deschedule the task
             try:
                 this_task = this_task.__of__(self)
                 status = this_task() # perform the task
@@ -214,7 +214,7 @@
             try:
                 next_time, next_task = status
                 # make sure we don't try to store an acquisition wrapper
-                next_task = getattr(next_task, 'aq_base', next_task)
+                next_task = aq_base(next_task)
             except TypeError:
                 next_time, next_task = None, None
             
@@ -233,31 +233,50 @@
         when = int(when)
         # XXX:  We should really be storing paths and dereferencing them
         #       here;  re-wrapping the task object in the schedule is icky!
-        tasks = self.tasks.items(None, when) #min, max
-        return [ ( x[0], aq_base( x[1] ).__of__( self ) ) for x in tasks ]
+        timelist = self.times.items(None, when) # min, max
+        l = []
+        for t, taskid_list in timelist:
+            for taskid in taskid_list:
+                task = self.tasks.get(taskid)
+                task = aq_base(task).__of__(self)
+                l.append((t, task))
+        return l
                 
     security.declareProtected(VIEW_SCHEDULE_PERM, 'getPendingTaskInfo')
     def getPendingTaskInfo(self, when=None):
         """
             Return a sequence of mappings for use by UI.
         """
-        return [(x[0], {'when': x[1].getTime(),
-                        'info': x[1].getInfo(),
-                        'description': x[1].getDescription()})
-                for x in self.getPendingTasks(when)]
+        l = []
+        for key, task in self.getPendingTasks(when):
+            d = { 'when':task.getTime(), 'info':task.getInfo(),
+                  'description':task.getDescription(),
+                  'taskid':task.taskid }
+            l.append((key, d))
+        return l
 
+    def new_task_id(self, time):
+        return '%010i%010i' % (time, random.randint(0, sys.maxint - 1))
+    
     security.declareProtected(CHANGE_SCHEDULE_PERM, 'schedule')
     def schedule(self, time, task):
+        task = aq_base(task)
+        taskid = self.new_task_id(time)
+        while not self.tasks.insert(taskid, task):
+            taskid = self.new_task_id(time)
+        task.taskid = taskid
         time = int(time)
-        while not self.tasks.insert(time, task):
-            # give the task as close as possible time to its requested
-            # runtime
-            time = time + 1
+        l = self.times.get(time, [])
+        l.append(taskid)
+        self.times[time] = l
 
     security.declareProtected(CHANGE_SCHEDULE_PERM, 'deschedule')
-    def deschedule(self, time):
-        """ deschedule the task by removing from the tasks BTree """
-        del self.tasks[time]
+    def deschedule(self, taskid):
+        """ deschedule the task by removing from the tasks and times BTrees """
+        time = int(taskid[:10])
+        del self.tasks[taskid]
+        l = self.times.get(time)
+        l.remove(taskid)
 
     security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_scheduleTask')
     def manage_scheduleTask(self, description, when, path, interval,
@@ -280,11 +299,11 @@
             return self.manage_current_tasks(self, REQUEST)
 
     security.declareProtected(CHANGE_SCHEDULE_PERM, 'manage_descheduleTask')
-    def manage_descheduleTask(self, when, REQUEST=None):
+    def manage_descheduleTask(self, taskids, REQUEST=None):
         """ """
-        if isinstance(when, IntType):
-            when = [when]
-        for t in when:
+        if isinstance(taskids, StringType):
+            taskids = [taskids]
+        for t in taskids:
             try:
                 # this try-except is in case the form is submitted after
                 # notify has kicked a task out of the queue.
@@ -302,6 +321,10 @@
     def now(self):
         return int(time.time())
 
+    security.declarePublic('maxTime')
+    def maxTime(self):
+        return sys.maxint
+
     security.declareProtected(MGMT_SCREEN_PERM, 'manage_current_tasks')
     manage_current_tasks = PageTemplateFile(
         'www/manage_current_tasks', globals(),__name__='manage_current_tasks'
@@ -312,143 +335,6 @@
        'www/manage_schedule_tasks',globals(),__name__='manage_schedule_tasks'
         )
 
-class Task(Implicit, Persistent):
-
-    security = ClassSecurityInfo()
-    security.declareObjectPublic()
-    security.setDefaultAccess("allow")
-
-    __implements__ = IScheduledEvent
-
-    def __init__(self, description, when, path, interval=None, max_age=600,
-                 max_retries=5, retry_backoff_time=60, filter_data=None):
-        self.description = description
-        self.when = when
-        self.path = path
-        self.interval = interval
-        self.max_age = max_age
-        self.max_retries = max_retries
-        self.retry_backoff_time = retry_backoff_time
-        self.retries = 0
-        self.filter_data = filter_data
-
-    def __call__(self):
-        """Run the method on self.path with REQUEST['args'].
-
-        On failure we reschedule, iff max_age and max_retries have not been
-        exceeded."""
-
-        now = int(time.time())
-        # secondary max_age guard (in case of clock failure)
-        if self.max_age and (now > self.when + self.max_age):
-            msg = ('Task "%s" scheduled for %s failed due to exceeded maximum '
-                   'age - not rescheduling'
-                   % (self.getDescription(), pretty_time(self.when)))
-            LOG('Scheduler', ERROR, msg)
-            return 1
-
-        try:
-
-            REQUEST = getattr(self, 'REQUEST')
-            result = self._runMethod(REQUEST)
-
-        except:
-
-            exc_name, exc_msg, exc_tb = sys.exc_info()
-            synopsis = "'%s', %s" % (exc_name, exc_msg)
-            exc_name = exc_msg = exc_tb = None
-
-            # retry guard
-            if self.max_retries and ((self.retries + 1) > self.max_retries):
-                msg = ('Task "%s" scheduled for %s failed on exception'
-                       '- not rescheduling due to exceeded maximum retries.'
-                       '  Exception: %s'
-                       % (self.description, pretty_time(self.when), synopsis))
-                LOG('Scheduler', ERROR, msg, error=sys.exc_info())
-                return 1
-            
-            then = now + self.retry_backoff_time
-
-            # primary max_age guard
-            if self.max_age and (then > self.when + self.max_age):
-                msg = ('Task "%s" scheduled for %s failed on exception'
-                       ' - not rescheduling due to exceeded maximum age'
-                       '  Exception: %s'
-                       % (self.description, pretty_time(self.when), synopsis))
-                LOG('Scheduler', ERROR, msg, error=sys.exc_info())
-                return 1
-
-            # we schedule a retry if we get to here
-            msg = ('Task "%s" scheduled for %s was not completed due '
-                   'to an exception - retrying at %s.'
-                   '  Exception: %s.'
-                   % (self.description, pretty_time(self.when),
-                      pretty_time(then), synopsis))
-            LOG('Scheduler', PROBLEM, msg, error=sys.exc_info())
-            self.retries = self.retries + 1
-            return then, self
-
-        # If we get here, we log success and return None
-        msg = ('Task %s was run successfully with result %s' %
-               (self.description, result))
-        LOG('Scheduler', BLATHER, msg)
-        return None
-
-    def next(self):
-        # having an interval means the task is recurring
-        if self.interval:
-            now = int(time.time())
-            next_time = self.when + self.interval
-            skipped = 0
-            while next_time < now:
-                # catch up!
-                skipped  = skipped + 1
-                next_time = next_time + self.interval
-            if skipped:
-                # report that we've lost some task runs
-                msg = ('Task "%s" was skipped %s times due to a clock '
-                       'failure or task server downtime'  % (self.description,
-                                                              skipped))
-                LOG('Scheduler', BLATHER, msg)
-            return next_time, self.clone(next_time)
-        return None # we do not have an interval
-
-    def info(self):
-        return ""
-
-    def getDescription(self):
-        return self.description
-
-    def getInfo(self):
-        return ''
-
-    def getTime(self):
-        return self.when
-
-    def getFilterData(self):
-        return self.filter_data
-
-    def _runMethod(self, request):
-        args = getattr(request, 'args', {})
-        method = self.restrictedTraverse(self.path)
-        result = mapply(method, args, request,
-                        call_object, 1,
-                        missing_name,
-                        dont_publish_class,
-                        request, bind=1)
-        return result
-
-    def clone(self, when):
-        LOG('Scheduler', BLATHER, 'clone called')
-        return self.__class__(description=self.description,
-                              when=when,
-                              path=self.path,
-                              interval=self.interval,
-                              max_age=self.max_age,
-                              max_retries=self.max_retries,
-                              retry_backoff_time=self.retry_backoff_time,
-                              filter_data=self.filter_data)
-
 class Filter(Base, Persistent):
     def __init__(self, key):
         self.key = key
@@ -458,9 +344,6 @@
         if key and key() == self.key:
             return 1
         return 0
-
-def pretty_time(t):
-    return time.ctime(t)
 
 manage_addSchedulerForm = PageTemplateFile(
         'www/addScheduler',