UNKNOWN
Project description
A catalog queue provides a queue for catalog indexing. The basic idea is to queue catalog operations so:
Operations can be batched for greater efficiency
Application requests don’t have to wait for indexing to be done
The benefits of queueing are especially significant when text indexes are used.
Detailed Documentation
Using Queues
A queue is created by instantiating a zc.catalogqueue.queue.CatalogQueue object:
>>> import zc.catalogqueue.queue >>> queue = zc.catalogqueue.queue.CatalogQueue()
We can pass a queue size. It should be a prime number. The default is 1009, which is a bit large.
>>> queue = zc.catalogqueue.queue.CatalogQueue(11)
Typically, queues are registered as zc.catalogqueue.interfaces.ICatalogQueue utilities.
>>> import zope.interface, pprint >>> pprint.pprint(sorted(zope.interface.providedBy(queue)), width=1) [<InterfaceClass zc.catalogqueue.interfaces.ICatalogQueue>, <InterfaceClass persistent.interfaces.IPersistent>]
There are some bits of information that the queue maintains regarding its own processing state. The time of last processing and the total number of cataloging events processed are available. Since this queue hasn’t been processed yet, these have some initial values:
>>> print queue.lastProcessedTime None >>> queue.totalProcessed 0
The length of the queue provides access to the number of pending cataloging events:
>>> len(queue) 0
Queues are used in 2 ways. As content are modified, we call add, update, and remove methods on the queue:
>>> queue.add(1) >>> queue.update(1) >>> queue.remove(1)>>> queue.update(2) >>> queue.update(2)>>> queue.add(3) >>> queue.update(3) >>> queue.add(3) Traceback (most recent call last): ... TypeError: Attempt to add an object that is already in the catalog>>> queue.update(4) >>> queue.update(4) >>> queue.update(4)>>> queue.remove(5) >>> queue.update(5) Traceback (most recent call last): ... TypeError: Attempt to change an object that has been removed>>> queue.update(0) >>> queue.update(0)
At this point, we’ve added several events, but haven’t processed the queue, so we expect lastProcessedTime, totalProcessed to be unchanged, but the queue length to reflect the pending tasks:
>>> print queue.lastProcessedTime None >>> queue.totalProcessed 0 >>> len(queue) 6
Periodically, we call process on the queue. We need to pass an ids object and a collection of injection (catalog) objects:
>>> class Ids: ... def queryObject(self, id, default=None): ... if not id: ... return default ... return "object %s" % id>>> class Injection: ... def __init__(self, name): ... self.name = name ... def index_doc(self, docid, value): ... print self.name, 'indexing', docid, value ... def unindex_doc(self, docid): ... print self.name, 'unindexing', docid>>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10) cat1 unindexing 1 cat2 unindexing 1 cat1 indexing 2 object 2 cat2 indexing 2 object 2 cat1 indexing 3 object 3 cat2 indexing 3 object 3 cat1 indexing 4 object 4 cat2 indexing 4 object 4 cat1 unindexing 5 cat2 unindexing 5 6
There are a number of things to note about this example:
Each object is processed only once.
What happens depends on the last event.
Object 0 wasn’t indexed because queryObject returned None. We ignore events for objects that have been removed from the intid utility.
The number of objects processed is returned.
The processing information has been updated on the queue:
>>> queue.lastProcessedTime # doctest: +ELLIPSIS datetime.datetime(... tzinfo=<UTC>) >>> queue.totalProcessed 6>>> previous_time = queue.lastProcessedTime
The length of the queue now indicates that no further events are pending:
>>> len(queue) 0
If we process the queue without additional events, we’ll just get 0 back:
>>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10) 0
The historical processing information is updated:
>>> queue.lastProcessedTime # doctest: +ELLIPSIS datetime.datetime(... tzinfo=<UTC>) >>> queue.lastProcessedTime > previous_time True >>> queue.totalProcessed 6>>> len(queue) 0
Of course, the limit argument limits how many events we process:
>>> for i in range(10): ... queue.update(i) >>> len(queue) 10 >>> queue.process(Ids(), [Injection('cat1')], 5) cat1 indexing 1 object 1 cat1 indexing 2 object 2 cat1 indexing 3 object 3 cat1 indexing 4 object 4 5 >>> queue.totalProcessed 11 >>> len(queue) 5>>> queue.process(Ids(), [Injection('cat1')], 5) cat1 indexing 5 object 5 cat1 indexing 6 object 6 cat1 indexing 7 object 7 cat1 indexing 8 object 8 cat1 indexing 9 object 9 5 >>> queue.totalProcessed 16 >>> len(queue) 0
(Remember that 0 isn’t processed because it can’t be found.)
When an object can’t be found, a warning is logged:
>>> import zope.testing.loggingsupport >>> handler = zope.testing.loggingsupport.InstalledHandler('zc') >>> queue.update(0) >>> queue.process(Ids(), [Injection('cat1')], 5) 1>>> print handler zc.catalogqueue.queue WARNING Couldn't find object for 0>>> handler.uninstall()
Edgecase
If a “old” state has two ‘ADDED’ events, and the committed state processes the queue, and the “new” state modifies one of the objects marked for addition, the code marks the other for removal.
>>> from zc.catalogqueue.CatalogEventQueue import ( ... CatalogEventQueue, ADDED, REMOVED, CHANGED, CHANGED_ADDED) >>> cq = CatalogEventQueue()>>> def resolve(old, committed, new): ... return sorted(cq._p_resolveConflict( ... {'_conflict_policy': 0, '_data': old}, ... {'_conflict_policy': 0, '_data': committed}, ... {'_conflict_policy': 0, '_data': new} ... )['_data'].items())>>> resolve({1: (1, ADDED), 2: (1, ADDED)}, {}, ... {1: (1, ADDED), 2: (3, REMOVED), 3: (1, ADDED)}) [(2, (3, 0)), (3, (1, 1))]
Download
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.