[Zope-CVS] CVS: Products/QueueCatalog - CatalogEventQueue.py:1.1 Delegation.dtml:1.1 Processor.py:1.1 QueueCatalog.gif:1.1 QueueCatalog.py:1.1 Statistics.dtml:1.1 __init__.py:1.1 add.pt:1.1

Jim Fulton jim@zope.com
Mon, 29 Jul 2002 10:45:26 -0400


Update of /cvs-repository/Products/QueueCatalog
In directory cvs.zope.org:/tmp/cvs-serv21302

Added Files:
	CatalogEventQueue.py Delegation.dtml Processor.py 
	QueueCatalog.gif QueueCatalog.py Statistics.dtml __init__.py 
	add.pt 
Log Message:
initial queued catalog implementation

=== Added File Products/QueueCatalog/CatalogEventQueue.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""
$Id: CatalogEventQueue.py,v 1.1 2002/07/29 14:45:25 jim Exp $
"""

from Persistence import Persistent
from ZODB.POSException import ConflictError
from time import time as current_wall_time

REMOVED       = 0
ADDED         = 1
CHANGED       = 2
CHANGED_ADDED = 3
EVENT_TYPES = (REMOVED, CHANGED, ADDED, CHANGED_ADDED)
antiEvent = {REMOVED:       ADDED,
             ADDED:         REMOVED,
             CHANGED:       CHANGED,
             CHANGED_ADDED: CHANGED_ADDED,
             }.get

ADDED_EVENTS = (CHANGED, ADDED, CHANGED_ADDED)


class CatalogEventQueue(Persistent):
    """Event queue for catalog events

    This is a rather odd queue. It organizes events by object, where
    objects are identified by uids, which happen to be string paths.

    One way that this queue is extremely odd is that it really only
    keeps track of the last event for an object. This is because we
    really obky *care* about the last event for an object.

    There are three types of events:

    ADDED         -- An object was added to the catalog

    CHANGED       -- An object was changed

    REMOVED       -- An object was removed from the catalog

    CHANGED_ADDED -- Add object was added and subsequently changed.
                     This event is a consequence of the queue implementation.
                     
    Note that, although we only keep track of the most recent
    event. there are rules for how the most recent event can be
    updated:

    - It is illegal to update an ADDED, CHANGED, or CHANGED_ADDED
      event with an ADDED event or

    - to update a REMOVED event with a CHANGED event.

    We have a problem because applications don't really indicate
    whether they are are adding, or just updating.  We deduce add
    events by examining the catalog and event queue states.

    Also note that, when events are applied to the catalog, events may
    have no effect.

    - If an object is in the catalog, ADDED events are equivalent to
      CHANGED events.

    - If an object is not in the catalog, REMOVED and CHANGED events
      have no effect.

    If we undo a transaction, we generate an anti-event. The anti
    event of ADDED id REMOVED, of REMOVED is ADDED, and of CHANGED is
    CHANGED. 

    Note that these rules represent huristics that attempt to provide
    efficient and sensible behavior for most cases. They are not "correct" in
    that they handle cases that may not seem handleable. For example,
    consider a sequence of transactions:

      T1 adds an object
      T2 removes the object
      T3 adds the object
      T4 processes the quete
      T5 undoes T1

    It's not clear what should be done in this case? We decide to
    generate a remove event, even though a later transaction added the
    object again. Is this correct? It's hard to say. The decision we
    make is not horrible and it allows us to provide a very effeicient
    implementation.  See the unit tests for other scenarious. Feel
    free to think of cases for which our decisions are unacceptably
    wrong and write unit tests for these cases.

    There are two kinds of transactions that effect the queue:

    - Application transactions always add or modify events. They never
      remove events.

    - Queue processing transactions always remove events.
    
    """

    def __init__(self):

        # Mapping from uid -> (generation, event type)
        self._data = {}

    def __nonzero__(self):
        return not not self._data

    def __len__(self):
        return len(self._data)
        
    def update(self, uid, etype):
        assert etype in EVENT_TYPES
        data = self._data
        current = data.get(uid)
        if current is not None:
            generation, current = current
            if current in ADDED_EVENTS and etype is ADDED:
                raise TypeError("Attempt to add an object that is already "
                                "in the catalog")
            if current is REMOVED and etype is CHANGED:
                raise TypeError("Attempt to change an object that has "
                                "been removed")

            if ((current is ADDED or current is CHANGED_ADDED)
                and etype is CHANGED):
                etype = CHANGED_ADDED
                
        else:
            generation = 0

        data[uid] = generation+1, etype

        self._p_changed = 1

    def getEvent(self, uid):
        state = self._data.get(uid)
        if state is not None:
            state = state[1]
        return state

    def process(self):
        data = self._data
        self._data = {}
        return data

    def _p_resolveConflict(self, oldstate, committed, newstate):
        # Apply the changes made in going from old to newstate to
        # committed

        # Note that in the case of undo, the olddata is the data for
        # the transaction being undone and newdata is the data for the
        # transaction previous to the undone transaction.

        # Committed is always the currently committed data.

        oldstate_data  =  oldstate['_data']
        committed_data = committed['_data']
        newstate_data  =  newstate['_data']

        # Merge newstate changes into committed
        for uid, new in newstate_data.items():

            # Decide if this is a change
            old = oldstate_data.get(uid)
            
            if new != old:
                # something changed

                if old is not None:
                    # got a repeat event
                    if new[0] < old[0]:
                        # This was an undo, so give the event the undo
                        # time and convert to an anti event of the old
                        # (undone) event. 
                        new = (0, antiEvent(old[1]))
                    elif new[1] is ADDED:
                        raise ConflictError

                    # remove this event from old, so that we don't
                    # mess with it later.
                    del oldstate_data[uid]

                # Check aqainst current value. Either we want a
                # different event, in which case we give up, or we
                # do nothing.
                
                
                current = committed_data.get(uid)
                if current is not None:
                    if current[1] != new[1]:
                        # This is too complicated, bail
                        raise ConflictError
                    # nothing to do
                    continue

                committed_data[uid] = new

        # Now handler remaining events in old that weren't in new.
        # These *must* be undone events!
        for uid, old in oldstate_data.items():
            new = (0, antiEvent(old[1]))
            
            # See above
            current = committed_data.get(uid)
            if current is not None:
                if current[1] != new[1]:
                    # This is too complicated, bail
                    raise ConflictError
                # nothing to do
                continue

            committed_data[uid] = new

        return {'_data': committed_data}

__doc__ = CatalogEventQueue.__doc__ + __doc__



# Old worries

# We have a problem. We have to make sure that we don't lose too
# much history to undo, but we don't want to retain the entire
# history. We certainly don't want to execute the entire history
# when we execute a trans.
#
# Baah, no worry, if we undo in a series of unprocessed events, we
# simply restore the old event, which we have in the old state.




=== Added File Products/QueueCatalog/Delegation.dtml ===
<dtml-var manage_page_header>
<dtml-var manage_tabs>

<form action="manage_setDelegation">

Location: <input name="location"
                 value="&dtml-manage_getLocation;"
                 />
<br>
<input type="submit" value="Change" />

<dtml-var manage_page_footer>


=== Added File Products/QueueCatalog/Processor.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""
$Id: Processor.py,v 1.1 2002/07/29 14:45:25 jim Exp $
"""

import thread
import Zope
from time import sleep
from zLOG import LOG, ERROR, PANIC

class Processor:
    """Simple thread that processes queued catalog events
    """

    def __init__(self, queue_catalog_paths, interval=60):
        self._queue_catalog_paths = queue_catalog_paths
        self._interval = interval
        thread.start_new_thread(self.live, ())

    def live(self):
        while 1:
            sleep(self._interval)
            for queue_catalog_path in self._queue_catalog_paths:
                try:
                    application = Zope.app()
                except:
                    LOG('QueuedCatalog', PANIC,
                        "Could'nt connect to database",
                        error=sys.exc_info())
                    break # No point in doing any more paths right now
                else:

                    try:
                        queue_catalog = app.unrestrictedTraverse(
                            queue_catalog_path)
                        queue_catalog.process()
                    except:
                        LOG('QueuedCatalog', ERROR, 'Queue processing failed',
                            error=sys.exc_info())

                    application._p_jar.close()


__doc__ = Processor.__doc__ + __doc__



=== Added File Products/QueueCatalog/QueueCatalog.gif ===
  <Binary-ish file>

=== Added File Products/QueueCatalog/QueueCatalog.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################
"""
$Id: QueueCatalog.py,v 1.1 2002/07/29 14:45:25 jim Exp $
"""

from zExceptions import Unauthorized
from ExtensionClass import Base
from OFS.SimpleItem import SimpleItem
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityInfo import ClassSecurityInformation
from OFS.SimpleItem import SimpleItem
from BTrees.OOBTree import OOBTree
from time import time
from CatalogEventQueue import CatalogEventQueue, EVENT_TYPES, ADDED_EVENTS
from CatalogEventQueue import ADDED, CHANGED, CHANGED_ADDED, REMOVED
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Globals import HTMLFile
from Acquisition import Implicit

StringType = type('')

_zcatalog_methods = {
    'catalog_object': 1,
    'uncatalog_object': 1,
    'uniqueValuesFor': 1,
    'getpath': 1,
    'getrid': 1,
    'getobject': 1,
    'schema': 1,
    'indexes': 1,
    'index_objects': 1,
    'searchResults': 1,
    '__call__': 1,
    }

_zcatalog_method = _zcatalog_methods.has_key

_views = {}

class QueueCatalog(Implicit, SimpleItem):
    """Queued ZCatalog (Proxy)

    A QueueCatalog delegates most requests to a ZCatalog that is named
    as part of the QueueCatalog configuration.

    Requests to catalog or uncatalog objects are queued. They must be
    processed by a separate process (or thread). The queuing provides
    benefits:

    - Content-management operations, performed by humans, complete
      much faster, this making the content-management system more
      effiecient for it's users.

    - Catalog updates are batched, which makes indexing much more
      efficient.

    - Indexing is performed by a single thread, allowing more
      effecient catalog document generation and avoiding conflict
      errors from occuring during indexing.

    - When used with ZEO, indexing might e performed on the same
      machine as the storage server, making updates faster.
      
    """

    def __init__(self, buckets=1009):
        self._buckets = buckets
        self._location = None
        self._queues = [CatalogEventQueue() for i in range(buckets)]

    def setLocation(self, location):
        if self._location is not None:
            try:
                self.process()
            except TypeError:
                # clear the queues
                self.__init__()

        self._location = location
        

    def getZCatalog(self, method=''):
        
        ZC = getattr(self, '_v_ZCatalog', None)
        if ZC is None:
            if self._location is None:
                raise TypeError(
                    "This QueueCatalog hasn't been "
                    "configured with a ZCatalog location."
                    )
            ZC = self.unrestrictedTraverse(self._location)
            self._v_ZCatalog = ZC

        security_manager = getSecurityManager()

        if not security_manager.validateValue(ZC):
            raise Unauthorized(self._location, ZC)

        if method:
            if not _zcatalog_method(method):
                raise AttributeError(method)
            ZC = getattr(ZC, method)
            if not security_manager.validateValue(ZC):
                raise Unauthorized(name=method)
                

        return ZC

    def __getattr__(self, name):
        # The original object must be wrapped, but self isn't, so
        # we return a special object that will do the attribute access
        # on a wrapped object. 
        if _zcatalog_method(name):
            return AttrWrapper(name)

        raise AttributeError(name)

    def _update(self, uid, etype):
        if uid[:1] != '/':
            raise TypeError("uid must start with '/'")

        t = time()
        self._queues[hash(uid) % self._buckets].update(uid, etype)
        
    def catalog_object(self, obj, uid=None):

        # Make sure the current context is allowed to to this:
        catalog_object = self.getZCatalog('catalog_object')
        
        if uid is None:
            uid = '/'.join(obj.getPhysicalPath())
        elif type(uid) is not StringType:
            uid = '/'.join(uid)

        # The ZCatalog API doesn't allow us to distinguish between
        # adds and updates, so we have to try to figure this out
        # ourselves.

        # There's a risk of a race here. What if there is a previously
        # unprocessed add event? If so, then this should be a changed
        # event. If we undo this transaction later, we'll generate a
        # remove event, when we should generate an add changed event.
        # To avoid this, we need to make sure we see consistent values
        # of the event queue. We also need to avoid resolving
        # (non-undo) conflicts of add events. This will slow things
        # down a bit, but adds should be relatively infrequent. 

        # Now, try to decide if the catalog has the uid (path).

        catalog = self.getZCatalog()

        if cataloged(catalog, uid):
            event = CHANGED
        else:
            # Looks like we should add, but maybe there's already a
            # pending add event. We'd better check the event queue:
            
            if (self._queues[hash(uid) % self._buckets].getEvent(uid) in
                ADDED_EVENTS):
                event = CHANGED
            else:
                event = ADDED
        
        self._update(uid, event)

    def uncatalog_object(self, uid):

        # Make sure the current context is allowed to to this:
        self.getZCatalog('uncatalog_object')

        if type(uid) is not StringType:
            uid = '/'.join(uid)

        self._update(uid, REMOVED)

    def process(self):
        "Process pending events"
        catalog = self.getZCatalog()
        for queue in filter(None, self._queues):
            events = queue.process()
            for uid, (t, event) in events.items():
                if event is REMOVED:
                    if cataloged(catalog, uid):
                        catalog.uncatalog_object(uid)
                else:
                    # add or change

                    if event is CHANGED and not cataloged(catalog, uid):
                        continue
                    
                    obj = self.unrestrictedTraverse(uid)
                    catalog.catalog_object(obj, uid)

    # Provide web pages. It would be nice to use views, but Zope 2.6
    # just isn't ready for views. :( In particular, we'd have to fake
    # out the PageTemplateFiles in some brittle way to make them do
    # the right thing. :(

    manage_Delegation = HTMLFile('Delegation', globals())

    def manage_getLocation(self):
        return self._location or ''

    def manage_setDelegation(self, location, RESPONSE=None):
        "Change the catalog delegated to"
        self.setLocation(location or None)

        if RESPONSE is not None:
            RESPONSE.redirect('manage_Delegation')

        return location
        
    
    manage_Statistics = HTMLFile('Statistics', globals())

    def manage_size(self):
        size = 0
        for q in self._queues:
            size += len(q)

        return size

    def manage_process(self, RESPONSE):
        "Web UI to manually process queues"
        
        # make sure we have necessary perm
        self.getZCatalog('catalog_object')
        self.getZCatalog('uncatalog_object')
        self.process()
        RESPONSE.redirect('manage_Statistics')
    
    # Provide Zope 2 offerings

    index_html = None

    meta_type = 'ZCatalog Queue'

    __allow_access_to_unprotected_subobjects__ = 0
    manage_options=(
        (
        {'label': 'Delegation', 'action': 'manage_Delegation',
         'help':('QueueCatalog','QueueCatalog-Delegation.stx')},

        {'label': 'Statistics', 'action': 'manage_Statistics',
         'help':('QueueCatalog','QueueCatalog-Statistics.stx')},
        )
        +SimpleItem.manage_options
        )

    security = ClassSecurityInformation()

    security.declareObjectPublic()

    security.declarePublic('catalog_object', 'uncatalog_object',
                           'manage_process')

    security.declareProtected(
        'View management screens',
        'manage_Delegation', 'manage_setDelegation',
        'manage_Statistics', 'manage_getLocation',
        'manage_size'
        )
    
def cataloged(catalog, path):
    getrid = getattr(catalog, 'getrid', None)
    if getrid is None:
        
        # This is an old catalog that doesn't provide an API for
        # getting an objects rid (and thus determing that the
        # object is already cataloged.
        
        # We'll just use our knowledge of the internal structure.
        
        rid = catalog._catalog.uids.get(path)
        
    else:
        rid = catalog.getrid(path)

    return rid is not None

class AttrWrapper(Base):
    "Special object that allowes us to use acquisition in QueueCatalog "
    "attribute access"

    def __init__(self, name):
        self.__name__ = name

    def __of__(self, wrappedQueueCatalog):
        return wrappedQueueCatalog.getZCatalog(self.__name__)

__doc__ = QueueCatalog.__doc__ + __doc__









=== Added File Products/QueueCatalog/Statistics.dtml ===
<dtml-var manage_page_header>
<dtml-var manage_tabs>

&dtml-manage_size; items queued.

<form action="manage_process">
  <input type="submit" value="Process" />
</form>

<dtml-var manage_page_footer>


=== Added File Products/QueueCatalog/__init__.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
# 
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
# 
##############################################################################

__doc__='''Package wrapper for Queued Catalogs

$Id: __init__.py,v 1.1 2002/07/29 14:45:25 jim Exp $'''
__version__='$$'[11:-2]

# Placeholder for Zope Product data
misc_ = {}

from QueueCatalog import QueueCatalog
from Products.PageTemplates.PageTemplateFile import PageTemplateFile

manage_addQueueCatalogForm = PageTemplateFile(
    'add.pt', globals(), __name__='manage_addQueueCatalogForm')


def manage_addQueueCatalog(self, id, REQUEST=None):
    "Add a Catalog Queue"
    ob = QueueCatalog()
    ob.id = id
    self._setObject(id, ob)
    if REQUEST is not None:

        try:
            u = self.DestinationURL()
        except AttributeError:
            u = REQUEST['URL1']

        REQUEST.RESPONSE.redirect(u+'/manage_main')

def initialize(context):
    context.registerClass(
        QueueCatalog,
        permission='Add ZCatalogs',
        constructors=(manage_addQueueCatalogForm, manage_addQueueCatalog, ),
        icon='QueueCatalog.gif',
        )
    #context.registerHelp()
    #context.registerHelpTitle('Zope Help')


=== Added File Products/QueueCatalog/add.pt ===
<html><head></head><body>

<form action="manage_addQueueCatalog">
  Enter an id: <input name="id">
  <br>
  <input type="submit" value="Add" />
</form>

</body></html>