[Zope-dev] ZCatalog patch for indexing larger amounts of text

abel deuring a.deuring@satzbau-gmbh.de
Fri, 25 May 2001 15:22:33 +0200


This is a multi-part message in MIME format.
--------------A0E2FD1E896E3F3A8034DBB6
Content-Type: text/plain; charset=us-ascii
Content-Transfer-Encoding: 7bit


Hi all,

I got problems similar to those described by Erik Enge a few weeks ago
with indexing a somewhat larger amount of text (~500 MB in ~194000
objects): Zope tends to "eat up" all available memory (640MB in my case)
and swap space.

The main problem seem to be that the classes Zcatalog, Catalog,
UnTextIndex and [Globbing}Lexicon try to do all partial indexing tasks
at once: The Catalog instance collects meta data for each indexed
object; the UnTextIndex instance adds new words to the Lexicon instance,
builds an entry for each indexed object in _unindex and - most important
- updates _index "just in time". 

The lexicon contains more than 1 million entries for "my" data, and the
_index entries for common words may contain a considerable number of
references to indexed object. I am relatively new to Python and Zope, so
I don't have any idea, how much memory is required for a reference to an
indexed object in UnTextIndex._index. But assuming 10 bytes for each
reference, and assuming that on average each indexed word/Lexicon entry
occurs in 20 objects, the _index object will require around 200 MB, if
it is kept completely in RAM.

This can be easily avoided, if the process of updating
UnTextIndex._index is separated from the other indexing stuff. With the
attached patch, UnTextIndex.index_object writes the data for new _index
entries as lines containing WordId, DocumentId, score, into a pipe which
is connected to Good Old sort(1). The output from sort is read by the
new UnTextIndex method updateWordBTree. This way, each entry of
UnTextIndex._index needs to be updated only once, while the original
implementation of UnTextIndex required an update for each occurence of a
word in every indexed object.

There are two obvious points, where the patch can be optimized. (1) the
"perfectly sorted" list returned by sort(1) is obviously not very
efficient for building the IIBTree indexRow in updateWordBTree, and (2)
the type check for indexRow in updateWordBTree can of course be avoided,
after the type has been set to IIBTree. (updateWordBTree is simply
modified version of insertForwardIndexEntry -- not very clever but most
easy to implement :) 

Abel

PS: Watching Zope indexing the above mentioned data with subtransaction
enabled, I wondered, where all the data from the finished
subtransactions is stored. Data.fs is not updated, and I could not find
any temporary files. (ok, I had not have yet the chance to look what's
going on during, while updateWordBTree is running; during my tests, it
was called in the middle of the night -- but while
ZCatalog.ZopeFindAndApply is running, nothing seems to hallen)
--------------A0E2FD1E896E3F3A8034DBB6
Content-Type: text/plain; charset=us-ascii;
 name="Catalog.py.diff"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="Catalog.py.diff"

--- Catalog.py.orig	Thu Apr 19 19:07:29 2001
+++ Catalog.py	Fri May 25 13:46:16 2001
@@ -105,6 +105,7 @@
 from SearchIndex.randid import randid
 
 import time
+import popen2
 
 def orify(seq,
           query_map={
@@ -379,8 +380,24 @@
         self.indexes = indexes
 
     # the cataloging API
+    
+    def openPipes(self):
+        pipes={}
+        for name in self.indexes.keys():
+            index = self.indexes[name]
+            index = index.__of__(self)
+            if hasattr(index, 'updateWordBTree'):
+                pipes[name] = popen2.Popen3("sort", 1)
+        return pipes
+
+    def updateWordBTrees(self, pipes, threshold=None):
+        for name in self.indexes.keys():
+            index = self.indexes[name]
+            index = index.__of__(self)
+            if hasattr(index, 'updateWordBTree'):
+                index.updateWordBTree(pipes[name], threshold)
 
-    def catalogObject(self, object, uid, threshold=None):
+    def catalogObject(self, object, uid, threshold=None, pipes=None):
         """ 
         Adds an object to the Catalog by iteratively applying it
         all indexes.
@@ -439,17 +456,34 @@
             self.paths[index] = uid
             
         total = 0
-        for x in self.indexes.values():
-            ## tricky!  indexes need to acquire now, and because they
-            ## are in a standard dict __getattr__ isn't used, so
-            ## acquisition doesn't kick in, we must explicitly wrap!
-            x = x.__of__(self)
-            if hasattr(x, 'index_object'):
-                blah = x.index_object(index, object, threshold)
-                total = total + blah
-            else:
-                LOG('Catalog', ERROR, ('catalogObject was passed '
-                                       'bad index object %s.' % str(x)))
+        if pipes is None:
+            for x in self.indexes.values():
+                ## tricky!  indexes need to acquire now, and because they
+                ## are in a standard dict __getattr__ isn't used, so
+                ## acquisition doesn't kick in, we must explicitly wrap!
+                x = x.__of__(self)
+                if hasattr(x, 'index_object'):
+                    blah = x.index_object(index, object, threshold)
+                    total = total + blah
+                else:
+                    LOG('Catalog', ERROR, ('catalogObject was passed '
+                                           'bad index object %s.' % str(x)))
+        else:
+            for name in self.indexes.keys():
+                x = self.indexes[name]
+                ## tricky!  indexes need to acquire now, and because they
+                ## are in a standard dict __getattr__ isn't used, so
+                ## acquisition doesn't kick in, we must explicitly wrap!
+                x = x.__of__(self)
+                if hasattr(x, 'index_object'):
+                    if pipes.has_key(name):
+                        blah = x.index_object(index, object, threshold, pipes[name])
+                    else:
+                        blah = x.index_object(index, object, threshold)
+                    total = total + blah
+                else:
+                    LOG('Catalog', ERROR, ('catalogObject was passed '
+                                           'bad index object %s.' % str(x)))
 
         return total
 

--------------A0E2FD1E896E3F3A8034DBB6
Content-Type: text/plain; charset=us-ascii;
 name="UnTextIndex.py.diff"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="UnTextIndex.py.diff"

--- UnTextIndex.py.orig	Wed Mar 28 01:43:11 2001
+++ UnTextIndex.py	Fri May 25 14:01:46 2001
@@ -305,7 +305,7 @@
             # put our first entry in, and use a tuple to save space
             index[entry] = (documentId, score)
 
-    def index_object(self, documentId, obj, threshold=None):
+    def index_object(self, documentId, obj, threshold=None, pipe=None):
         """ Index an object:
         'documentId' is the integer id of the document
         
@@ -356,9 +356,14 @@
         
         # Now index the words. Note that the new xIBTrees are clever
         # enough to do nothing when there isn't a change. Woo hoo.
-        insert=self.insertForwardIndexEntry
-        for wid, score in widScores.items():
-            insert(wid, documentId, score)
+        if pipe is None:
+            insert=self.insertForwardIndexEntry
+            for wid, score in widScores.items():
+                insert(wid, documentId, score)
+        else:
+            write = pipe.tochild.write
+            for wid, score in widScores.items():
+                write(`wid` + " " + `documentId` + " " + `score` + "\n")
 
         # Save the unindexing info if it's changed:
         wids=widScores.keys()
@@ -367,6 +372,80 @@
 
         return len(wids)
 
+    def updateWordBTree(self, pipe, threshold=None):
+        pipe.tochild.close()
+        readline = pipe.fromchild.readline
+        print "waiting for pipe input"
+        
+        index = self._index
+        
+        s = readline()
+        lastentry = None
+        count = 0
+        
+        while len(s) > 4:
+            (entry, documentId, score) = s.split(' ')
+            entry = int(entry)
+            documentId = int(documentId)
+            score = int(score)
+            
+            # copied from insertForwardIndexEntry:
+            if entry != lastentry:
+                indexRow = index.get(entry, None)
+                if threshold is not None:
+                    count = count + 1
+                    if count >= threshold:
+                        get_transaction().commit(1)
+                        self._p_jar.cacheFullSweep(3)
+                        print "commit indexer", s,
+                        count = 0
+            
+            if indexRow is not None:
+                if type(indexRow) is TupleType:
+                    # Tuples are only used for rows which have only
+                    # a single entry.  Since we now need more, we'll
+                    # promote it to a mapping object (dictionary).
+
+                    # First, make sure we're not already in it, if so
+                    # update the score if necessary.
+                    if indexRow[0] == documentId:
+                        if indexRow[1] != score:
+                            indexRow = (documentId, score)
+                            index[entry] = indexRow
+                    else:
+                        indexRow={
+                            indexRow[0]: indexRow[1],
+                            documentId: score,
+                            }
+                        index[entry] = indexRow
+                else:
+                     # xxx optimization: put a "readline loop"
+                     # into _this_ block!!
+                    if indexRow.get(documentId, -1) != score:
+                        # score changed (or new entry)
+                        
+                        if type(indexRow) is DictType:
+                            indexRow[documentId] = score
+                            if len(indexRow) > 3:
+                                # Big enough to give it's own database record
+                                indexRow=IIBTree(indexRow) 
+                            index[entry] = indexRow
+                        else:
+                            indexRow[documentId] = score
+            else:
+                # We don't have any information at this point, so we'll
+                # put our first entry in, and use a tuple to save space
+                indexRow = index[entry] = (documentId, score)
+            
+            lastentry = entry
+            s = readline()
+        
+        pipe.fromchild.close()
+        res = pipe.wait()
+        # xxx I'm new to Python and Zope... How should an error 
+        # be handled? raise an exception?
+        return
+    
     def _subindex(self, source, wordScores, last, splitter):
         """Recursively handle multi-word synonyms"""
         for word in splitter(source):

--------------A0E2FD1E896E3F3A8034DBB6
Content-Type: text/plain; charset=us-ascii;
 name="ZCatalog.py.diff"
Content-Transfer-Encoding: 7bit
Content-Disposition: inline;
 filename="ZCatalog.py.diff"

--- ZCatalog.py.orig	Wed Mar 21 23:48:04 2001
+++ ZCatalog.py	Fri May 25 13:48:04 2001
@@ -359,6 +359,7 @@
         obj = REQUEST.PARENTS[1]
         path = string.join(obj.getPhysicalPath(), '/')
 
+        pipelist = self._catalog.openPipes()
         
         results = self.ZopeFindAndApply(obj,
                                         obj_metatypes=obj_metatypes,
@@ -372,8 +373,11 @@
                                         search_sub=1,
                                         REQUEST=REQUEST,
                                         apply_func=self.catalog_object,
-                                        apply_path=path)
+                                        apply_path=path,
+                                        pipes=pipelist)
 
+        self._catalog.updateWordBTrees(pipelist, self.threshold)
+        
         elapse = time.time() - elapse
         c_elapse = time.clock() - c_elapse
         
@@ -412,7 +416,7 @@
             RESPONSE.redirect(URL1 + '/manage_catalogIndexes?manage_tabs_message=Index%20Deleted')
 
 
-    def catalog_object(self, obj, uid=None):
+    def catalog_object(self, obj, uid=None, pipes=None):
         """ wrapper around catalog """
 
         if uid is None:
@@ -426,13 +430,12 @@
         elif type(uid) is not StringType:
             raise CatalogError('The object unique id must be a string.')
 
-        self._catalog.catalogObject(obj, uid, None)
+        self._catalog.catalogObject(obj, uid, None, pipes)
         # None passed in to catalogObject as third argument indicates
         # that we shouldn't try to commit subtransactions within any
         # indexing code.  We throw away the result of the call to
         # catalogObject (which is a word count), because it's
         # worthless to us here.
-        
         if self.threshold is not None:
             # figure out whether or not to commit a subtransaction.
             t = id(get_transaction())
@@ -449,6 +452,7 @@
             # to commit a subtransaction.  The semantics here mean that
             # we should commit a subtransaction if our threshhold is
             # exceeded within the boundaries of the current transaction.
+            
             if self._v_total > self.threshold:
                 get_transaction().commit(1)
                 self._p_jar.cacheFullSweep(3)
@@ -574,7 +578,7 @@
                          obj_permission=None, obj_roles=None,
                          search_sub=0,
                          REQUEST=None, result=None, pre='',
-                         apply_func=None, apply_path=''):
+                         apply_func=None, apply_path='', pipes=None):
         """Zope Find interface and apply
 
         This is a *great* hack.  Zope find just doesn't do what we
@@ -649,7 +653,7 @@
                 )
                 ):
                 if apply_func:
-                    apply_func(ob, (apply_path+'/'+p))
+                    apply_func(ob, (apply_path+'/'+p), pipes)
                 else:
                     add_result((p, ob))
                     dflag=0
@@ -661,7 +665,7 @@
                                       obj_permission, obj_roles,
                                       search_sub,
                                       REQUEST, result, p,
-                                      apply_func, apply_path)
+                                      apply_func, apply_path, pipes)
             if dflag: ob._p_deactivate()
 
         return result

--------------A0E2FD1E896E3F3A8034DBB6--