[Zope3-checkins] CVS: Zope3/lib/python/Zope/App/Tasks - ISchedule.py:1.1.2.1 ITask.py:1.1.2.1 ITimeEvent.py:1.1.2.1 Schedule.py:1.1.2.1 Task.py:1.1.2.1 TimeEvent.py:1.1.2.1 TimeEventProducer.py:1.1.2.1 __init__.py:1.1.2.1

Ulrich Eck ueck@net-labs.de
Mon, 9 Dec 2002 11:18:00 -0500


Update of /cvs-repository/Zope3/lib/python/Zope/App/Tasks
In directory cvs.zope.org:/tmp/cvs-serv530

Added Files:
      Tag: jack-e_scheduler_branch
	ISchedule.py ITask.py ITimeEvent.py Schedule.py Task.py 
	TimeEvent.py TimeEventProducer.py __init__.py 
Log Message:
Zope.App.Tasks:

Prototype Implementation of Jim's Schedule Design


=== Added File Zope3/lib/python/Zope/App/Tasks/ISchedule.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

"""Interfaces for schedule.
$Id: ISchedule.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Interface import Interface

class ISchedule(Interface):
    """A Task Queue with Scheduling Policy
    """

    def schedule(task):
        """add a task to the Queue
        """

    def unschedule(task):
        """remove a task from the Queue
        """


class IScheduleControl(Interface):
    """A Task Queue Control Interface
    """

    def process(time):
        """Process due tasks
        """

    def getTaskTimes():
        """Return all datetimes for scheduled Tasks
        """    


=== Added File Zope3/lib/python/Zope/App/Tasks/ITask.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

"""Interfaces for task.
$Id: ITask.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Interface import Interface

from Zope.Schema import Datetime, Line

class ITask(Interface):

    time = Datetime(description=u"The time at which the task should execute",
                    readonly=1,
                    required=1)

    principalId = Line(description=
                     u"The principalId that the task should be executed as.",
                     readonly=1,
                     required=1)
    
    def __call__():
        """Execute the task

        Exceptions raised by execute will be logged.

        If execution is sucessful, return None.

        A task may be returned to retry work after an error.
        """

    def schedule():
        """Schedule a new task

        If the execution of the task is sucessful (execute returns None),
        then schedule is called to optionally (re)schedule a task.

        None may be returned if no task needs to be scheduled.

        This method is used to implement repeating tasks.
        """




=== Added File Zope3/lib/python/Zope/App/Tasks/ITimeEvent.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

Revision information:
$Id: ITimeEvent.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Interface.Attribute import Attribute

from Zope.Event.IEvent import IEvent

class ITimeEvent(IEvent):
    """The Base interface for Time Events"""

    time = Attribute("datetime object that stores the time for the Event")


=== Added File Zope3/lib/python/Zope/App/Tasks/Schedule.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

"""Implementation for schedule.
$Id: Schedule.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
import time

from Persistence import Persistent
from Persistence.BTrees.OOBTree import OOBTree
from ZODB.POSException import ConflictError

from Zope.Security.SecurityManagement import newSecurityManager

from Zope.ComponentArchitecture import getService
from Zope.ContextWrapper import ContextMethod
from Zope.Exceptions import NotFoundError
from Zope.Event.ISubscriber import ISubscriber

from Zope.App.Tasks.ISchedule import ISchedule, IScheduleControl
from Zope.App.Tasks.ITask import ITask
from Zope.App.Tasks.ITimeEvent import ITimeEvent

# Implementation note:
#
# Internally, there's a BTree mapping time -> [tasks]


class Schedule(Persistent):

    __implements__ = (ISchedule, IScheduleControl, ISubscriber)

    def __init__(self):
        self.tasks = OOBTree()

    ## Implementation for ISchedule

    def schedule(self, task):
        tasks = self.tasks

        task_added = 0

        if ITask.isImplementedBy(task):
             time = task.time
             self.tasks[time] = self.tasks.get(time, ()) + (task, )
        else:
            # task does not Implement ITask
            raise TypeError

    def unschedule(self, task):
        tasks = self.tasks

        if ITask.isImplementedBy(task):
            time = task.time

            if time in tasks:
                tasklist = tuple([ t for t in tasks[time] if not t is task ])
                if not tasklist:
                    del tasks[time]
                else:
                    tasks[time] = tasklist
            else:
                # task was not in queue
                raise ValueError
        else:
            # task does not implement ITask
            raise TypeError


    def process(self, time):
        # get Authentication Service
        auth_svc = getService(self, "Authentication")

        # retrieve all tasklists up to time
        seq = self.tasks.items(None,time)

        # process tasks
        for time_key, tasklist in seq:

    
            del self.tasks[time_key]

            for task in tasklist:
                print "processing task"
                # create SecurityContext 
                try:
                    principal = auth_svc.getPrincipal(task.principalId)
                except NotFoundError:
                    # unkown Principal -> don't execute Task
                    # XXX Log this
                    print "principal not found"
                    continue

                old_sec_manager = newSecurityManager(principal)
                try:

                    try:
                       errtask = task()
                    
                       if errtask is None:
                           # give task a chance to reschedule
                           newtask = task.schedule()
                           if newtask is not None:
                              self.schedule(newtask)
                       else:
                          self.schedule(errtask)
                    except:
                       # prevent the scheduler from crashing 
                       # when bogus task raised exception
                       # need to Log Exception though
                       print "Exception in Task"
                       pass

                finally:
                    newSecurityManager(old_sec_manager)
    process = ContextMethod(process)


    def getTaskTimes(self):
        return self.tasks.keys()



    ## Implementation for ISubscriber

    def notify(self, event):
        if ITimeEvent.isImplementedBy(event):
            time = event.time
            self.process(time)
        else:
            raise TypeError
    notify = ContextMethod(notify)


    currentlySubscribed = False

    def subscribe(self, channel=None):
        if self.currentlySubscribed:
            raise RuntimeError, "already subscribed; please unsubscribe first"
        channel = self._getChannel(channel)
        channel.subscribe(self,ITimeEvent)
        self.currentlySubscribed = True
    subscribe = ContextMethod(subscribe)

    def unsubscribe(self, channel=None):
        if not self.currentlySubscribed:
            raise RuntimeError, "already subscribed; please unsubscribe first"
        channel = self._getChannel(channel)
        channel.unsubscribe(self,ITimeEvent)
        self.currentlySubscribed = False
    unsubscribe = ContextMethod(unsubscribe)


    def isSubscribed(self):
        return self.currentlySubscribed

    def _getChannel(self, channel):
        if channel is None:
            channel = getService(self,"Events")
        return channel
    _getChannel = ContextMethod(_getChannel)


=== Added File Zope3/lib/python/Zope/App/Tasks/Task.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################

"""Implementation of Task.

$Id: Task.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Zope.ContextWrapper import ContextMethod

from Zope.App.Tasks.ITask import ITask


class Task:
    """Generic Task
    """

    __implements__ = ITask

    time = None

    principalId = None
 
    def __init__(self, time, principalId, func=None, *args, **kw):
        self.time = time
        self.principalId = principalId
        self._func = func
        self._args = args
        self._kw = kw

    #######################################################
    # Basic Implementation for Interface ITask

    def __call__(self):
        if callable(self._func):
            return apply(self._func, self._args, self._kw)

    __call__ = ContextMethod(__call__)

    def schedule(self):
        return None

    #
    #######################################################



=== Added File Zope3/lib/python/Zope/App/Tasks/TimeEvent.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""

Revision information:
$Id: TimeEvent.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""

__metaclass__ = type

from Zope.App.Tasks.ITimeEvent import ITimeEvent

_marker = object()

class TimeEvent:
    """an event that signals that some time has come"""

    __implements__ = ITimeEvent

    time = None

    def __init__(self, time):
        self.time = time


=== Added File Zope3/lib/python/Zope/App/Tasks/TimeEventProducer.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""
This module starts a new Thread for producing TimeEvents. 

$Id: TimeEventProducer.py,v 1.1.2.1 2002/12/09 16:17:59 jack-e Exp $
"""
from Transaction import get_transaction
from Zope.App.ZopePublication.ZopePublication import ZopePublication
from Zope.App.Traversing import traverse
from Zope.ComponentArchitecture import getService

from Zope.App.Tasks.TimeEvent import TimeEvent

from datetime import datetime, timedelta
import time


time_units = {'sec':1,
              'min':60,
              'hour':3600,
              'day':86400}


class TimeEventPublication:

    root_name = ZopePublication.root_name

    def __init__(self, db):
        self.db = db
        self._conn = None


    def beforeCall(self):
        get_transaction().begin()


    def getApplication(self):
        self._conn = conn = self.db.open()
        root = conn.root()
        app = root.get(self.root_name, None)
        
        if app is None:
            raise SystemError, "Zope Application Not Found"        

        return app


    def getEventService(self, app, path):
        place = traverse(app, path)
        event_service = getService(place,"Events")
        return event_service


    def publishTimeEvent(self, event_service, event_time=datetime.now()):
        event_service.publish(TimeEvent(event_time))
        # print "published TimeEvent %s" % event_time


    def afterCall(self):
        get_transaction().commit()
        self._conn.close()



def startTimeEventProducer(_zodb, *args):
    """TimeEventProducer producing regular TimeEvents

    A TimeEventProducer takes a resolution and unit and produces
    TimeEvents according to the specified resolution.
    """
    config = {}
    for arg in args:
        key,val = arg.split('=')
        config[key] = val

    resolution = int(config.get('resolution', 60))
    if resolution < 1:
        resolution = 1

    unit = config.get('unit', 'sec')
    if not unit in time_units.keys():
        # unkown unit
        return
    unit_val = time_units[unit]

    path = config.get('path', '/')

    
    # try to fire at first time feasible for unit
    now = datetime.now()
    if unit == 'sec':
        delta = datetime(now.year, now.month, now.day, now.hour, now.minute, now.second+1) - now
    elif unit == 'min':
        delta = datetime(now.year, now.month, now.day, now.hour, now.minute+1) - now
    elif unit == 'hour':
        delta = datetime(now.year, now.month, now.day, now.hour+1) - now
    elif unit == 'day':
        delta = datetime(now.year, now.month, now.day+1) - now
    else:
        delta = timedelta(0, 0, 0)

    time.sleep(delta.seconds + delta.microseconds/1000000)


    while 1:
        try:
            pub = TimeEventPublication(_zodb)
            app = pub.getApplication()
            pub.beforeCall()
            evt_svc = pub.getEventService(app, path)

            now = datetime.now()
            pub.publishTimeEvent(evt_svc, now)
            pub.afterCall()
        except:
            try:
                pub.afterCall()
            except:
                pass


        time.sleep(resolution*unit_val)

    # cleanup ??


=== Added File Zope3/lib/python/Zope/App/Tasks/__init__.py ===