Perform durable tasks asynchronously
Project description
Introduction
Goals
The zc.async package provides a way to schedule jobs to be performed out-of-band from your current thread. The job might be done in another thread or another process, possibly on another machine. Here are some example core use cases.
You want to let users do something that requires a lot of system resources from your application, such as creating a large PDF. Naively done, six or seven simultaneous PDF requests will consume your application thread pool and could make your application unresponsive to any other users.
You want to let users spider a web site; communicate with a credit card company; query a large, slow LDAP database on another machine; or do some other action that generates network requests from the server. System resources might not be a problem, but, again, if something goes wrong, several requests could make your application unresponsive.
Perhaps because of resource contention, you want to serialize work that can be done asynchronously, such as updating a single data structure like a catalog index.
You want to decompose and parallelize a single job across many machines so it can be finished faster.
You have an application job that you discover is taking longer than users can handle, even after you optimize it. You want a quick fix to move the work out-of-band.
Many of these core use cases involve end-users being able to start potentially expensive processes, on demand. Basic scheduled tasks are also provided by this package, though recurrence must be something you arrange.
History
This is a second-generation design. The first generation was zasync, a mission-critical and successful Zope 2 product in use for a number of high-volume Zope 2 installations. [1] It’s worthwhile noting that zc.async has absolutely no backwards compatibility with zasync and zc.async does not require Zope (although it can be used in conjunction with it, details below).
Design Overview
Overview: Usage
Looking at the design from the perspective of regular usage, your code obtains a queue, which is a place to register jobs to be performed asynchronously.
Your application calls put on the queue to register a job. The job must be a pickleable, callable object. A global function, a callable persistent object, a method of a persistent object, or a special zc.async.job.Job object (discussed later) are all examples of suitable objects. The job by default is registered to be performed as soon as possible, but can be registered to be called at a certain time.
The put call will return a zc.async.job.Job object. This object represents both the callable and its deferred result. It has information about the job requested, the current state of the job, and the result of performing the job.
An example spelling for registering a job might be self.pending_result = queue.put(self.performSpider). The returned object can be stored and polled to see when the job is complete; or the job can be configured to do additional work when it completes (such as storing the result in a data structure).
Overview: Mechanism
Multiple processes, typically spread across multiple machines, can connect to the queue and claim and perform work. As with other collections of processes that share pickled objects, these processes generally should share the same software (though some variations on this constraint should be possible).
A process that should claim and perform work, in addition to a database connection and the necessary software, needs a dispatcher with a reactor to provide a heartbeat. The dispatcher will rely on one or more persistent agents in the queue (in the database) to determine which jobs it should perform.
A dispatcher is in charge of dispatching queued work for a given process to worker threads. It works with one or more queues and a single reactor. It has a universally unique identifier (UUID), which is usually an identifier of the application instance in which it is running. The dispatcher starts jobs in dedicated threads.
A reactor is something that can provide an eternal loop, or heartbeat, to power the dispatcher. It can be the main twisted reactor (in the main thread); another instance of a twisted reactor (in a child thread); or any object that implements a small subset of the twisted reactor interface (see discussion in dispatcher.txt, and example testing reactor in testing.py, used below).
An agent is a persistent object in a queue that is associated with a dispatcher and is responsible for picking jobs and keeping track of them. Zero or more agents within a queue can be associated with a dispatcher. Each agent for a given dispatcher in a given queue is identified uniquely with a name [2].
Generally, these work together as follows. The reactor calls the dispatcher. The dispatcher tries to find the mapping of queues in the database root under a key of zc.async (see constant zc.async.interfaces.KEY). If it finds the mapping, it iterates over the queues (the mapping’s values) and asks each queue for the agents associated with the dispatcher’s UUID. The dispatcher then is responsible for seeing what jobs its agents want to do from the queue, and providing threads and connections for the work to be done. The dispatcher then asks the reactor to call itself again in a few seconds.
Reading More
This document continues on with four other main sections: Usage, Configuration, Configuration with Zope 3, and Tips and Tricks.
Other documents in the package are primarily geared as maintainer documentation, though the author has tried to make them readable and understandable.
Usage
Overview and Basics
The basic usage of zc.async does not depend on a particular configuration of the back-end mechanism for getting the jobs done. Moreover, on some teams, it will be the responsibility of one person or group to configure zc.async, but a service available to the code of all team members. Therefore, we begin our detailed discussion with regular usage, assuming configuration has already happened. Subsequent sections discuss configuring zc.async with and without Zope 3.
So, let’s assume we have a queue with dispatchers, reactors and agents all waiting to fulfill jobs placed into the queue. We start with a connection object, conn, and some convenience functions introduced along the way that help us simulate time passing and work being done [3].
Obtaining the queue
First, how do we get the queue? Your installation may have some conveniences. For instance, the Zope 3 configuration described below makes it possible to get the primary queue with an adaptation call like zc.async.interfaces.IQueue(a_persistent_object_with_db_connection).
But failing that, queues are always expected to be in a zc.async.queue.Queues mapping found off the ZODB root in a key defined by the constant zc.async.interfaces.KEY.
>>> import zc.async.interfaces >>> zc.async.interfaces.KEY 'zc.async' >>> root = conn.root() >>> queues = root[zc.async.interfaces.KEY] >>> import zc.async.queue >>> isinstance(queues, zc.async.queue.Queues) True
As the name implies, queues is a collection of queues. As discussed later, it’s possible to have multiple queues, as a tool to distribute and control work. We will assume a convention of a queue being available in the ‘’ (empty string).
>>> queues.keys() [''] >>> queue = queues['']
queue.put
Now we want to actually get some work done. The simplest case is simple to perform: pass a persistable callable to the queue’s put method and commit the transaction.
>>> def send_message(): ... print "imagine this sent a message to another machine" >>> job = queue.put(send_message) >>> import transaction >>> transaction.commit()
Note that this won’t really work in an interactive session: the callable needs to be picklable, as discussed above, so send_message would need to be a module global, for instance.
The put returned a job. Now we need to wait for the job to be performed. We would normally do this by really waiting. For our examples, we will use a helper method on the testing reactor to wait_for the job to be completed.
>>> reactor.wait_for(job) imagine this sent a message to another machine
We also could have used the method of a persistent object. Here’s another quick example.
First we define a simple persistent.Persistent subclass and put an instance of it in the database [4].
>>> import persistent >>> class Demo(persistent.Persistent): ... counter = 0 ... def increase(self, value=1): ... self.counter += value ... >>> root['demo'] = Demo() >>> transaction.commit()
Now we can put the demo.increase method in the queue.
>>> root['demo'].counter 0 >>> job = queue.put(root['demo'].increase) >>> transaction.commit()>>> reactor.wait_for(job) >>> root['demo'].counter 1
The method was called, and the persistent object modified!
To reiterate, only pickleable callables such as global functions and the methods of persistent objects can be used. This rules out, for instance, lambdas and other functions created dynamically. As we’ll see below, the job instance can help us out there somewhat by offering closure-like features.
queue.pull and queue.remove
If you put a job into a queue and it hasn’t been claimed yet and you want to cancel the job, pull or remove it from the queue.
The pull method removes the first job, or takes an integer index.
>>> len(queue) 0 >>> job1 = queue.put(send_message) >>> job2 = queue.put(send_message) >>> len(queue) 2 >>> job1 is queue.pull() True >>> list(queue) == [job2] True >>> job1 is queue.put(job1) True >>> list(queue) == [job2, job1] True >>> job1 is queue.pull(-1) True >>> job2 is queue.pull() True >>> len(queue) 0
The remove method removes the specific given job.
>>> job1 = queue.put(send_message) >>> job2 = queue.put(send_message) >>> len(queue) 2 >>> queue.remove(job1) >>> list(queue) == [job2] True >>> job1 is queue.put(job1) True >>> list(queue) == [job2, job1] True >>> queue.remove(job1) >>> list(queue) == [job2] True >>> queue.remove(job2) >>> len(queue) 0
Scheduled Calls
When using put, you can also pass a datetime.datetime to schedule a call. A datetime without a timezone is considered to be in the UTC timezone.
>>> t = transaction.begin() >>> import datetime >>> import pytz >>> datetime.datetime.now(pytz.UTC) datetime.datetime(2006, 8, 10, 15, 44, 33, 211, tzinfo=<UTC>) >>> job = queue.put( ... send_message, begin_after=datetime.datetime( ... 2006, 8, 10, 15, 56, tzinfo=pytz.UTC)) >>> job.begin_after datetime.datetime(2006, 8, 10, 15, 56, tzinfo=<UTC>) >>> transaction.commit() >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds TIME OUT >>> reactor.wait_for(job, attempts=2) # +5 virtual seconds TIME OUT >>> datetime.datetime.now(pytz.UTC) datetime.datetime(2006, 8, 10, 15, 44, 43, 211, tzinfo=<UTC>)>>> zc.async.testing.set_now(datetime.datetime( ... 2006, 8, 10, 15, 56, tzinfo=pytz.UTC)) >>> reactor.wait_for(job) imagine this sent a message to another machine >>> datetime.datetime.now(pytz.UTC) >= job.begin_after True
If you set a time that has already passed, it will be run as if it had been set to run as soon as possible [5]…unless the job has already timed out, in which case the job fails with an abort [6].
The queue’s put method is the essential API. pull is used rarely. Other methods are used to introspect, but are not needed for basic usage.
But what is that result of the put call in the examples above? A job? What do you do with that?
Jobs
Overview
The result of a call to put returns an IJob. The job represents the pending result. This object has a lot of functionality that’s explored in other documents in this package, and demonstrated a bit below, but here’s a summary.
You can introspect, and even modify, the call and its arguments.
You can specify that the job should be run serially with others of a given identifier.
You can specify other calls that should be made on the basis of the result of this call.
You can persist a reference to it, and periodically (after syncing your connection with the database, which happens whenever you begin or commit a transaction) check its status to see if it is equal to zc.async.interfaces.COMPLETED. When it is, the call has run to completion, either to success or an exception.
You can look at the result of the call (once COMPLETED). It might be the result you expect, or a zc.twist.Failure, a subclass of twisted.python.failure.Failure, which is a way to safely communicate exceptions across connections and machines and processes.
Results
So here’s a simple story. What if you want to get a result back from a call? Look at the job.result after the call is COMPLETED.
>>> def imaginaryNetworkCall(): ... # let's imagine this makes a network call... ... return "200 OK" ... >>> job = queue.put(imaginaryNetworkCall) >>> print job.result None >>> job.status == zc.async.interfaces.PENDING True >>> transaction.commit() >>> reactor.wait_for(job) >>> t = transaction.begin() >>> job.result '200 OK' >>> job.status == zc.async.interfaces.COMPLETED True
Closures
What’s more, you can pass a Job to the put call. This means that you aren’t constrained to simply having simple non-argument calls performed asynchronously, but you can pass a job with a call, arguments, and keyword arguments–effectively, a kind of closure. Here’s a quick example. We’ll use the demo object, and its increase method, that we introduced above, but this time we’ll include some arguments [7].
With positional arguments:
>>> t = transaction.begin() >>> job = queue.put( ... zc.async.job.Job(root['demo'].increase, 5)) >>> transaction.commit() >>> reactor.wait_for(job) >>> t = transaction.begin() >>> root['demo'].counter 6
With keyword arguments (value):
>>> job = queue.put( ... zc.async.job.Job(root['demo'].increase, value=10)) >>> transaction.commit() >>> reactor.wait_for(job) >>> t = transaction.begin() >>> root['demo'].counter 16
Note that arguments to these jobs can be any persistable object.
Failures
What happens if a call raises an exception? The return value is a Failure.
>>> def I_am_a_bad_bad_function(): ... return foo + bar ... >>> job = queue.put(I_am_a_bad_bad_function) >>> transaction.commit() >>> reactor.wait_for(job) >>> t = transaction.begin() >>> job.result <zc.twist.Failure exceptions.NameError>
Failures can provide useful information such as tracebacks.
>>> print job.result.getTraceback() ... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE Traceback (most recent call last): ... exceptions.NameError: global name 'foo' is not defined <BLANKLINE>
Callbacks
You can register callbacks to handle the result of a job, whether a Failure or another result.
Note that, unlike callbacks on a Twisted deferred, these callbacks do not change the result of the original job. Since callbacks are jobs, you can chain results, but generally callbacks for the same job all get the same result as input.
Also note that, during execution of a callback, there is no guarantee that the callback will be processed on the same machine as the main call. Also, some of the local functions, discussed below, will not work as desired.
Here’s a simple example of reacting to a success.
>>> def I_scribble_on_strings(string): ... return string + ": SCRIBBLED" ... >>> job = queue.put(imaginaryNetworkCall) >>> callback = job.addCallback(I_scribble_on_strings) >>> transaction.commit() >>> reactor.wait_for(job) >>> job.result '200 OK' >>> callback.result '200 OK: SCRIBBLED'
Here’s a more complex example of handling a Failure, and then chaining a subsequent callback.
>>> def I_handle_NameErrors(failure): ... failure.trap(NameError) # see twisted.python.failure.Failure docs ... return 'I handled a name error' ... >>> job = queue.put(I_am_a_bad_bad_function) >>> callback1 = job.addCallbacks(failure=I_handle_NameErrors) >>> callback2 = callback1.addCallback(I_scribble_on_strings) >>> transaction.commit() >>> reactor.wait_for(job) >>> job.result <zc.twist.Failure exceptions.NameError> >>> callback1.result 'I handled a name error' >>> callback2.result 'I handled a name error: SCRIBBLED'
Advanced Techniques and Tools
Important
The job and its functionality described above are the core zc.async tools.
The following are advanced techniques and tools of various complexities. You can use zc.async very productively without ever understanding or using them. If the following do not make sense to you now, please just move on for now.
zc.async.local
Jobs always run their callables in a thread, within the context of a connection to the ZODB. The callables have access to five special thread-local functions if they need them for special uses. These are available off of zc.async.local.
- zc.async.local.getJob()
The getJob function can be used to examine the job, to get a connection off of _p_jar, to get the queue into which the job was put, or other uses.
- zc.async.local.getQueue()
The getQueue function can be used to examine the queue, to put another task into the queue, or other uses. It is sugar for zc.async.local.getJob().queue.
- zc.async.local.setLiveAnnotation(name, value, job=None)
The setLiveAnnotation tells the agent to set an annotation on a job, by default the current job, in another connection. This makes it possible to send messages about progress or for coordination while in the middle of other work.
As a simple rule, only send immutable objects like strings or numbers as values [8].
- zc.async.local.getLiveAnnotation(name, default=None, timeout=0, poll=1, job=None)
The getLiveAnnotation tells the agent to get an annotation for a job, by default the current job, from another connection. This makes it possible to send messages about progress or for coordination while in the middle of other work.
As a simple rule, only ask for annotation values that will be immutable objects like strings or numbers [9].
If the timeout argument is set to a positive float or int, the function will wait at least that number of seconds until an annotation of the given name is available. Otherwise, it will return the default if the name is not present in the annotations. The poll argument specifies approximately how often to poll for the annotation, in seconds (to be more precise, a subsequent poll will be min(poll, remaining seconds until timeout) seconds away).
- zc.async.local.getReactor()
The getReactor function returns the job’s dispatcher’s reactor. The getLiveAnnotation and setLiveAnnotation functions use this, along with the zc.twist package, to work their magic; if you are feeling adventurous, you can do the same.
- zc.async.local.getDispatcher()
The getDispatcher function returns the job’s dispatcher. This might be used to analyze its non-persistent poll data structure, for instance (described later in configuration discussions).
Let’s give three of those a whirl. We will write a function that examines the job’s state while it is being called, and sets the state in an annotation, then waits for our flag to finish.
>>> def annotateStatus(): ... zc.async.local.setLiveAnnotation( ... 'zc.async.test.status', ... zc.async.local.getJob().status) ... zc.async.local.getLiveAnnotation( ... 'zc.async.test.flag', timeout=5) ... return 42 ... >>> job = queue.put(annotateStatus) >>> transaction.commit() >>> import time >>> def wait_for_annotation(job, key): ... reactor.time_flies(dispatcher.poll_interval) # starts thread ... for i in range(10): ... while reactor.time_passes(): ... pass ... transaction.begin() ... if key in job.annotations: ... break ... time.sleep(0.1) ... else: ... print 'Timed out' + repr(dict(job.annotations)) ... >>> wait_for_annotation(job, 'zc.async.test.status') >>> job.annotations['zc.async.test.status'] == ( ... zc.async.interfaces.ACTIVE) True >>> job.status == zc.async.interfaces.ACTIVE True
>>> job.annotations['zc.async.test.flag'] = True >>> transaction.commit() >>> reactor.wait_for(job) >>> job.result 42
[11] getReactor and getDispatcher are for advanced use cases and are not explored further here.
Job Quotas
One class of asynchronous jobs are ideally serialized. For instance, you may want to reduce or eliminate the chance of conflict errors when updating a text index. One way to do this kind of serialization is to use the quota_names attribute of the job.
For example, let’s first show two non-serialized jobs running at the same time, and then two serialized jobs created at the same time. The first part of the example does not use queue_names, to show a contrast.
For our parallel jobs, we’ll do something that would create a deadlock if they were serial. Notice that we are mutating the job arguments after creation to accomplish this, which is supported.
>>> def waitForParallel(other): ... zc.async.local.setLiveAnnotation( ... 'zc.async.test.flag', True) ... zc.async.local.getLiveAnnotation( ... 'zc.async.test.flag', job=other, timeout=0.4, poll=0) ... >>> job1 = queue.put(waitForParallel) >>> job2 = queue.put(waitForParallel) >>> job1.args.append(job2) >>> job2.args.append(job1) >>> transaction.commit() >>> reactor.wait_for(job1, job2) >>> job1.status == zc.async.interfaces.COMPLETED True >>> job2.status == zc.async.interfaces.COMPLETED True >>> job1.result is job2.result is None True
On the other hand, for our serial jobs, we’ll do something that would fail if it were parallel. We’ll rely on quota_names.
Quotas verge on configuration, which is not what this section is about, because they must be configured on the queue. However, they also affect usage, so we show them here.
>>> def pause(other): ... zc.async.local.setLiveAnnotation( ... 'zc.async.test.flag', True) ... res = zc.async.local.getLiveAnnotation( ... 'zc.async.test.flag', timeout=0.4, poll=0.1, job=other) ... >>> job1 = queue.put(pause) >>> job2 = queue.put(imaginaryNetworkCall)
You can’t put a name in quota_names unless the quota has been created in the queue.
>>> job1.quota_names = ('test',) Traceback (most recent call last): ... ValueError: ('unknown quota name', 'test') >>> queue.quotas.create('test') >>> job1.quota_names = ('test',) >>> job2.quota_names = ('test',)
Now we can see the two jobs being performed serially.
>>> job1.args.append(job2) >>> transaction.commit() >>> reactor.time_flies(dispatcher.poll_interval) 1 >>> for i in range(10): ... t = transaction.begin() ... if job1.status == zc.async.interfaces.ACTIVE: ... break ... time.sleep(0.1) ... else: ... print 'TIME OUT' ... >>> job2.status == zc.async.interfaces.PENDING True >>> job2.annotations['zc.async.test.flag'] = False >>> transaction.commit() >>> reactor.wait_for(job1) >>> reactor.wait_for(job2) >>> print job1.result None >>> print job2.result 200 OK
Quotas can be configured for limits greater than one at a time, if desired. This may be valuable when a needed resource is only available in limited numbers at a time.
Note that, while quotas are valuable tools for doing serialized work such as updating a text index, other optimization features sometimes useful for this sort of task, such as collapsing similar jobs, are not provided directly by this package. This functionality could be trivially built on top of zc.async, however [12].
Returning Jobs
Our examples so far have done work directly. What if the job wants to orchestrate other work? One way this can be done is to return another job. The result of the inner job will be the result of the first job once the inner job is finished. This approach can be used to break up the work of long running processes; to be more cooperative to other jobs; and to make parts of a job that can be parallelized available to more workers.
Serialized Work
First, consider a serialized example. This simple pattern is one approach.
>>> def second_job(value): ... # imagine a lot of work goes on... ... return value * 2 ... >>> def first_job(): ... # imagine a lot of work goes on... ... intermediate_value = 21 ... queue = zc.async.local.getJob().queue ... return queue.put(zc.async.job.Job( ... second_job, intermediate_value)) ... >>> job = queue.put(first_job) >>> transaction.commit() >>> reactor.wait_for(job, attempts=3) TIME OUT >>> len(agent) 1 >>> reactor.wait_for(job, attempts=3) >>> job.result 42
The job is now out of the agent.
>>> len(agent) 0
The second_job could also have returned a job, allowing for additional legs. Once the last job returns a real result, it will cascade through the past jobs back up to the original one.
A different approach could have used callbacks. Using callbacks can be somewhat more complicated to follow, but can allow for a cleaner separation of code: dividing code that does work from code that orchestrates the jobs. The serial helper function in the job module uses this pattern. Here’s a quick example of the helper function [13].
>>> def job_zero(): ... return 0 ... >>> def job_one(): ... return 1 ... >>> def job_two(): ... return 2 ... >>> def postprocess(zero, one, two): ... return zero.result, one.result, two.result ... >>> job = queue.put(zc.async.job.serial(job_zero, job_one, job_two, ... postprocess=postprocess)) >>> transaction.commit()>>> wait_repeatedly() ... # doctest: +ELLIPSIS TIME OUT...>>> job.result (0, 1, 2)
The parallel example we use below follows a similar pattern.
Parallelized Work
Now how can we set up parallel jobs? There are other good ways, but we can describe one way that avoids potential problems with the current-as-of-this-writing (ZODB 3.8 and trunk) default optimistic MVCC serialization behavior in the ZODB. The solution uses callbacks, which also allows us to cleanly divide the “work” code from the synchronization code, as described in the previous paragraph.
First, we’ll define the jobs that do work. job_A, job_B, and job_C will be jobs that can be done in parallel, and postprocess will be a function that assembles the job results for a final result.
>>> def job_A(): ... # imaginary work... ... return 7 ... >>> def job_B(): ... # imaginary work... ... return 14 ... >>> def job_C(): ... # imaginary work... ... return 21 ... >>> def postprocess(*jobs): ... # this callable represents one that needs to wait for the ... # parallel jobs to be done before it can process them and return ... # the final result ... return sum(job.result for job in jobs) ...
This can be handled by a convenience function, parallel, that will arrange everything for you.
>>> job = queue.put(zc.async.job.parallel( ... job_A, job_B, job_C, postprocess=postprocess)) >>> transaction.commit()
Now we just wait for the result.
>>> wait_repeatedly() ... # doctest: +ELLIPSIS TIME OUT...>>> job.result 42
Ta-da!
Now, how did this work? Let’s look at a simple implementation directly. We’ll use a slightly different postprocess, that expects results directly rather than the jobs.
>>> def postprocess(*results): ... # this callable represents one that needs to wait for the ... # parallel jobs to be done before it can process them and return ... # the final result ... return sum(results) ...
This code works with jobs to get everything done. Note, in the callback function, that mutating the same object we are checking (job.args) is the way we are enforcing necessary serializability with MVCC turned on.
>>> def callback(job, result): ... job.args.append(result) ... if len(job.args) == 3: # all results are in ... zc.async.local.getJob().queue.put(job) ... >>> def main_job(): ... job = zc.async.job.Job(postprocess) ... queue = zc.async.local.getJob().queue ... for j in (job_A, job_B, job_C): ... queue.put(j).addCallback( ... zc.async.job.Job(callback, job)) ... return job ...
That may be a bit mind-blowing at first. The trick to catch here is that, because the main_job returns a job, the result of that job will become the result of the main_job once the returned (post_process) job is done.
Now we’ll put this in and let it cook.
>>> job = queue.put(main_job) >>> transaction.commit()>>> wait_repeatedly() ... # doctest: +ELLIPSIS TIME OUT... >>> job.result 42
Once again, ta-da!
For real-world usage, you’d also probably want to deal with the possibility of one or more of the jobs generating a Failure, among other edge cases. The parallel function introduced above helps you handle this by returning jobs, rather than results, so you can analyze what went wrong and try to handle it.
Returning Deferreds
What if you want to do work that doesn’t require a ZODB connection? You can also return a Twisted deferred (twisted.internet.defer.Deferred). When you then callback the deferred with the eventual result, the agent will be responsible for setting that value on the original deferred and calling its callbacks. This can be a useful trick for making network calls using Twisted or zc.ngi, for instance.
>>> def imaginaryNetworkCall2(deferred): ... # make a network call... ... deferred.callback('200 OK') ... >>> import twisted.internet.defer >>> import threading >>> def delegator(): ... deferred = twisted.internet.defer.Deferred() ... t = threading.Thread( ... target=imaginaryNetworkCall2, args=(deferred,)) ... t.run() ... return deferred ... >>> job = queue.put(delegator) >>> transaction.commit() >>> reactor.wait_for(job) >>> job.result '200 OK'
Conclusion
This concludes our discussion of zc.async usage. The next section shows how to configure zc.async without Zope 3 [14].
Configuration
This section discusses setting up zc.async without Zope 3. Since Zope 3 is ill-defined, we will be more specific: this describes setting up zc.async without ZCML, without any zope.app packages, and with as few dependencies as possible. A casual way of describing the dependencies is “ZODB, Twisted, and zope.component,” though we directly depend on some smaller packages and indirectly on others [15].
You may have one or two kinds of configurations for your software using zc.async. The simplest approach is to have all processes able both to put items in queues, and to perform them with a dispatcher. You can then use on-the-fly ZODB configuration to determine what jobs, if any, each process’ dispatcher performs. If a dispatcher has no agents in a given queue, as we’ll discuss below, the dispatcher will not perform any job for that queue.
However, if you want to create some processes that can only put items in a queue, and do not have a dispatcher at all, that is easy to do. We’ll call this a “client” process, and the full configuration a “client/server process”. As you might expect, the configuration of a client process is a subset of the configuration of the client/server process.
We will first describe setting up a client, non-dispatcher process, in which you only can put items in a zc.async queue; and then describe setting up a dispatcher client/server process that can be used both to request and to perform jobs.
Configuring a Client Process
Generally, zc.async configuration has four basic parts: component registrations, ZODB setup, ZODB configuration, and process configuration. For a client process, we’ll discuss required component registrations; ZODB setup; minimal ZODB configuration; process configuration; and then circle back around for some optional component registrations.
Required Component Registrations
The required registrations can be installed for you by the zc.async.configure.base function. Most other examples in this package, such as those in the Usage section, use this in their test setup.
Again, for a quick start, you might just want to use the helper zc.async.configure.base function, and move on to the Required ZODB Set Up section below.
Here, though, we will go over each required registration to briefly explain what they are.
You must have three adapter registrations: IConnection to ITransactionManager, IPersistent to IConnection, and IPersistent to ITransactionManager.
The zc.twist package provides all of these adapters. However, zope.app.keyreference also provides a version of the connection adapter that is identical or very similar, and that should work fine if you are already using that package in your application.
>>> import zc.twist >>> import zope.component >>> zope.component.provideAdapter(zc.twist.transactionManager) >>> zope.component.provideAdapter(zc.twist.connection) >>> import ZODB.interfaces >>> zope.component.provideAdapter( ... zc.twist.transactionManager, adapts=(ZODB.interfaces.IConnection,))
We also need to be able to adapt functions and methods to jobs. The zc.async.job.Job class is the expected implementation.
>>> import types >>> import zc.async.interfaces >>> import zc.async.job >>> zope.component.provideAdapter( ... zc.async.job.Job, ... adapts=(types.FunctionType,), ... provides=zc.async.interfaces.IJob) >>> zope.component.provideAdapter( ... zc.async.job.Job, ... adapts=(types.MethodType,), ... provides=zc.async.interfaces.IJob) >>> zope.component.provideAdapter( # optional, rarely used ... zc.async.job.Job, ... adapts=(zc.twist.METHOD_WRAPPER_TYPE,), ... provides=zc.async.interfaces.IJob)
The queue looks for the UUID utility to set the assignerUUID job attribute, and may want to use it to optionally filter jobs during claim in the future. Also, the dispatcher will look for a UUID utility if a UUID is not specifically provided to its constructor.
>>> from zc.async.instanceuuid import UUID >>> zope.component.provideUtility( ... UUID, zc.async.interfaces.IUUID, '')
The UUID we register here is a UUID of the instance, which is expected to uniquely identify the process when in production. It is stored in the file specified by the ZC_ASYNC_UUID environment variable (or in os.join(os.getcwd(), 'uuid.txt') if this is not specified, for easy initial experimentation with the package).
>>> import uuid >>> import os >>> f = open(os.environ["ZC_ASYNC_UUID"]) >>> uuid_hex = f.readline().strip() >>> f.close() >>> uuid = uuid.UUID(uuid_hex) >>> UUID == uuid True
The uuid.txt file is intended to stay in the instance home as a persistent identifier.
Again, all of the required registrations above can be accomplished quickly with zc.async.configure.base.
Required ZODB Set Up
On a basic level, zc.async needs a setup that supports good conflict resolution. Most or all production ZODB storages now have the necessary APIs to support MVCC.
Of course, if you want to run multiple processes, you need ZEO. You should also then make sure that your ZEO server installation has all the code that includes conflict resolution, such as zc.queue, because, as of this writing, conflict resolution happens in the ZEO server, not in clients.
A more subtle decision is whether to use multiple databases. The zc.async dispatcher can generate a lot of database churn. It may be wise to put the queue in a separate database from your content database(s).
The downsides to this option include the fact that you must be careful to specify to which database objects belong; and that broken cross-database references are not handled gracefully in the ZODB as of this writing.
We will use multiple databases for our example here, because we are trying to demonstrate production-quality examples. We will show this with a pure-Python approach, rather than the ZConfig approach usually used by Zope. If you know ZConfig, that will be a reasonable approach as well; see zope.app.appsetup for how Zope uses ZConfig to set up multidatabases.
In our example, we create two file storages. In production, you might likely use ZEO; hooking ClientStorage up instead of FileStorage should be straight forward.
>>> databases = {} >>> import ZODB.FileStorage >>> storage = ZODB.FileStorage.FileStorage( ... 'main.fs', create=True)>>> async_storage = ZODB.FileStorage.FileStorage( ... 'async.fs', create=True)>>> from ZODB.DB import DB >>> databases[''] = db = DB(storage) >>> databases['async'] = async_db = DB(async_storage) >>> async_db.databases = db.databases = databases >>> db.database_name = '' >>> async_db.database_name = 'async' >>> conn = db.open() >>> root = conn.root()
ZODB Configuration
A Queue
All we must have for a client to be able to put jobs in a queue is … a queue.
For a quick start, the zc.async.subscribers module provides a subscriber to a DatabaseOpened event that does the right dance. See multidb_queue_installer and queue_installer in that module, and you can see that in use in Configuration with Zope 3. For now, though, we’re taking things step by step and explaining what’s going on.
Dispatchers look for queues in a mapping off the root of the database in a key defined as a constant: zc.async.interfaces.KEY. This mapping should generally be a zc.async.queue.Queues object.
If we were not using a multi-database for our example, we could simply install the queues mapping with this line: root[zc.async.interfaces.KEY] = zc.async.queue.Queues(). We will need something a bit more baroque. We will add the queues mapping to the ‘async’ database, and then make it available in the main database (‘’) with the proper key.
>>> conn2 = conn.get_connection('async') >>> import zc.async.queue >>> queues = conn2.root()['mounted_queues'] = zc.async.queue.Queues()
Note that the ‘mounted_queues’ key in the async database is arbitrary: what we care about is the key in the database that the dispatcher will see.
Now we add the object explicitly to conn2, so that the ZODB will know the “real” database in which the object lives, even though it will be also accessible from the main database.
>>> conn2.add(queues) >>> root[zc.async.interfaces.KEY] = queues >>> import transaction >>> transaction.commit()
Now we need to put a queue in the queues collection. We can have more than one, as discussed below, but we suggest a convention of the primary queue being available in a key of ‘’ (empty string).
>>> queue = queues[''] = zc.async.queue.Queue() >>> transaction.commit()
Quotas
We touched on quotas in the usage section. Some jobs will need to access resources that are shared across processes. A central data structure such as an index in the ZODB is a prime example, but other examples might include a network service that only allows a certain number of concurrent connections. These scenarios can be helped by quotas.
Quotas are demonstrated in the usage section. For configuration, you should know these characteristics:
you cannot add a job with a quota name that is not defined in the queue [16];
you cannot add a quota name to a job in a queue if the quota name is not defined in the queue [17];
you can create and remove quotas on the queue [18];
you can remove quotas if pending jobs have their quota names–the quota name is then ignored [19];
quotas default to a size of 1 [20];
this can be changed at creation or later [21]; and
decreasing the size of a quota while the old quota size is filled will not affect the currently running jobs [22].
Multiple Queues
Since we put our queues in a mapping of them, we can also create multiple queues. This can make some scenarios more convenient and simpler to reason about. For instance, while you might have agents filtering jobs as we describe above, it might be simpler to say that you have a queue for one kind of job–say, processing a video file or an audio file–and a queue for other kinds of jobs. Then it is easy and obvious to set up simple FIFO agents as desired for different dispatchers. The same kind of logic could be accomplished with agents, but it is easier to picture the multiple queues.
Another use case for multiple queues might be for specialized queues, like ones that broadcast jobs. You could write a queue subclass that broadcasts copies of jobs they get to all dispatchers, aggregating results. This could be used to send “events” to all processes, or to gather statistics on certain processes, and so on.
Generally, any time the application wants to be able to assert a kind of job rather than letting the agents decide what to do, having separate queues is a reasonable tool.
Process Configuration
Daemonization
You often want to daemonize your software, so that you can restart it if there’s a problem, keep track of it and monitor it, and so on. ZDaemon (http://pypi.python.org/pypi/zdaemon) and Supervisor (http://supervisord.org/) are two fairly simple-to-use ways of doing this for both client and client/server processes. If your main application can be packaged as a setuptools distribution (egg or source release or even development egg) then you can have your main application as a zc.async client and your dispatchers running a separate zc.async-only main loop that simply includes your main application as a dependency, so the necessary software is around. You may have to do a bit more configuration on the client/server side to mimic global registries such as zope.component registrations and so on between the client and the client/servers, but this shouldn’t be too bad.
UUID File Location
As discussed above, the instanceuuid module will look for an environmental variable ZC_ASYNC_UUID to find the file name to use, and failing that will use os.join(os.getcwd(), 'uuid.txt'). It’s worth noting that daemonization tools such as ZDaemon and Supervisor (3 or greater) make setting environment values for child processes an easy (and repeatable) configuration file setting.
Optional Component Registrations for a Client Process
The only optional component registration potentially valuable for client instances that only put jobs in the queue is registering an adapter from persistent objects to a queue. The zc.async.queue.getDefaultQueue adapter does this for an adapter to the queue named ‘’ (empty string). Since that’s what we have from the ZODB Configuration above section, we’ll register it. Writing your own adapter is trivial, as you can see if you look at the implementation of this function.
>>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue) >>> zc.async.interfaces.IQueue(root) is queue True
Configuring a Client/Server Process
Configuring a client/server process–something that includes a running dispatcher–means doing everything described above, plus a bit more. You need to set up and start a reactor and dispatcher; configure agents as desired to get the dispatcher to do some work; and optionally configure logging.
For a quick start, the zc.async.subscribers module has some conveniences to start a threaded reactor and dispatcher, and to install agents. You might want to look at those to get started. They are also used in the Zope 3 configuration (README_3). Meanwhile, this document continues to go step-by-step instead, to try and explain the components and configuration.
Even though it seems reasonable to first start a dispatcher and then set up its agents, we’ll first define a subscriber to create an agent. As we’ll see below, the dispatcher fires an event when it registers with a queue, and another when it activates the queue. These events give you the opportunity to register subscribers to add one or more agents to a queue, to tell the dispatcher what jobs to perform. zc.async.agent.addMainAgentActivationHandler is a reasonable starter: it adds a single agent named ‘main’ if one does not exist. The agent has a simple indiscriminate FIFO policy for the queue. If you want to write your own subscriber, look at this, or at the more generic subscriber in the zc.async.subscribers module.
Agents are an important part of the ZODB configuration, and so are described more in depth below.
>>> import zc.async.agent >>> zope.component.provideHandler( ... zc.async.agent.addMainAgentActivationHandler)
This subscriber is registered for the IDispatcherActivated event; another approach might use the IDispatcherRegistered event.
Starting the Dispatcher
Now we can start the reactor, and start the dispatcher. In some applications this may be done with an event subscriber to DatabaseOpened, as is done in zc.async.subscribers. Here, we will do it inline.
Any object that conforms to the specification of zc.async.interfaces.IReactor will be usable by the dispatcher. For our example, we will use our own instance of the Twisted select-based reactor running in a separate thread. This is separate from the Twisted reactor installed in twisted.internet.reactor, and so this approach can be used with an application that does not otherwise use Twisted (for instance, a Zope application using the “classic” zope publisher).
The testing module also has a reactor on which the Usage section relies, if you would like to see a minimal contract.
Configuring the basics is fairly simple, as we’ll see in a moment. The trickiest part is to handle signals cleanly. It is also optional! The dispatcher will eventually figure out that there was not a clean shut down before and take care of it. Here, though, essentially as an optimization, we install signal handlers in the main thread using reactor._handleSignals. reactor._handleSignals may work in some real-world applications, but if your application already needs to handle signals you may need a more careful approach. Again, see zc.async.subscribers for some options you can explore.
>>> import twisted.internet.selectreactor >>> reactor = twisted.internet.selectreactor.SelectReactor() >>> reactor._handleSignals()
Now we are ready to instantiate our dispatcher.
>>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.Dispatcher(db, reactor)
Notice it has the uuid defined in instanceuuid.
>>> dispatcher.UUID == UUID True
Now we can start the reactor and the dispatcher in a thread.
>>> import threading >>> def start(): ... dispatcher.activate() ... reactor.run(installSignalHandlers=0) ... >>> thread = threading.Thread(target=start) >>> thread.setDaemon(True)>>> thread.start()
The dispatcher should be starting up now. Let’s wait for it to activate. We’re using a test convenience, get_poll, defined in the testing module.
>>> from zc.async.testing import get_poll >>> poll = get_poll(dispatcher, 0)
We’re off! The events have been fired for registering and activating the dispatcher. Therefore, our subscriber to add our agent has fired.
We need to begin our transaction to synchronize our view of the database.
>>> t = transaction.begin()
We get the collection of dispatcher agents from the queue, using the UUID.
>>> dispatcher_agents = queue.dispatchers[UUID]
It has one agent–the one placed by our subscriber.
>>> dispatcher_agents.keys() ['main'] >>> agent = dispatcher_agents['main']
Now we have our agent! But…what is it [23]?
Agents
Agents are the way you control what a dispatcher’s worker threads do. They pick the jobs and assign them to their dispatcher when the dispatcher asks.
If a dispatcher does not have any agents in a given queue, it will not perform any tasks for that queue.
We currently have an agent that simply asks for the next available FIFO job. We are using an agent implementation that allows you to specify a callable to choose the job. That callable is now zc.async.agent.chooseFirst.
>>> agent.chooser is zc.async.agent.chooseFirst True
Here’s the entire implementation of that function:
def chooseFirst(agent): return agent.queue.claim()
What would another agent do? Well, it might pass a filter function to claim. This function takes a job and returns a value evaluated as a boolean. For instance, let’s say we always wanted a certain number of threads available for working on a particular call; for the purpose of example, we’ll use operator.mul, though a more real-world example might be a network call or a particular call in your application.
>>> import operator >>> def chooseMul(agent): ... return agent.queue.claim(lambda job: job.callable is operator.mul) ...
Another variant would prefer operator.mul, but if one is not in the queue, it will take any.
>>> def preferMul(agent): ... res = agent.queue.claim(lambda job: job.callable is operator.mul) ... if res is None: ... res = agent.queue.claim() ... return res ...
Other approaches might look at the current jobs in the agent, or the agent’s dispatcher, and decide what jobs to prefer on that basis. The agent should support many ideas.
Let’s set up another agent, in addition to the chooseFirst one, that has the preferMul policy.
>>> agent2 = dispatcher_agents['mul'] = zc.async.agent.Agent(preferMul)
Another characteristic of agents is that they specify how many jobs they should pick at a time. The dispatcher actually adjusts the size of the ZODB connection pool to accommodate its agents’ size. The default is 3.
>>> agent.size 3 >>> agent2.size 3
We can change that at creation or later.
Finally, it’s worth noting that agents contain the jobs that are currently worked on by the dispatcher, on their behalf; and have a completed collection of the more recent completed jobs, beginning with the most recently completed job.
Logging and Monitoring
Logs are sent to the zc.async.events log for big events, like startup and shutdown, and errors. Poll and job logs are sent to zc.async.trace. Configure the standard Python logging module as usual to send these logs where you need. Be sure to auto-rotate the trace logs.
The package supports monitoring using zc.z3monitor, but using this package includes more Zope 3 dependencies, so it is not included here. If you would like to use it, see monitor.txt in the package and our next section: Configuration with Zope 3. Otherwise, if you want to roll your own monitoring, glance at monitor.py–you’ll see that most of the heavy lifting for the monitor support is done in the dispatcher, so it should be pretty easy to hook up the basic data another way.
>>> reactor.stop()
More specifically, as of this writing, these are the minimal egg dependencies (including indirect dependencies):
- pytz
A Python time zone library
- rwproperty
A small package of descriptor conveniences
- uuid
The uuid module included in Python 2.5
- zc.dict
A ZODB-aware dict implementation based on BTrees.
- zc.queue
A ZODB-aware queue
- zc.twist
Conveniences for working with Twisted and the ZODB
- twisted
The Twisted internet library.
- ZConfig
A general configuration package coming from the Zope project with which the ZODB tests.
- zdaemon
A general daemon tool coming from the Zope project.
- ZODB3
The Zope Object Database.
- zope.bforest
Aggregations of multiple BTrees into a single dict-like structure, reasonable for rotating data structures, among other purposes.
- zope.component
A way to hook together code by contract.
- zope.deferredimport
A way to defer imports in Python packages, often to prevent circular import problems.
- zope.deprecation
A small framework for deprecating features.
- zope.event
An exceedingly small event framework that derives its power from zope.component.
- zope.i18nmessageid
A way to specify strings to be translated.
- zope.interface
A way to specify code contracts and other data structures.
- zope.proxy
A way to proxy other Python objects.
- zope.testing
Testing extensions and helpers.
The next section, Configuration With Zope 3, still tries to limit dependencies–we only rely on additional packages zc.z3monitor, simplejson, and zope.app.appsetup ourselves–but as of this writing zope.app.appsetup ends up dragging in a large chunk of zope.app.* packages. Hopefully that will be refactored in Zope itself, and our full Zope 3 configuration can benefit from the reduced indirect dependencies.
>>> import operator >>> import zc.async.job >>> job = zc.async.job.Job(operator.mul, 5, 2) >>> job.quota_names = ['content catalog'] >>> job.quota_names ('content catalog',) >>> queue.put(job) Traceback (most recent call last): ... ValueError: ('unknown quota name', 'content catalog') >>> len(queue) 0
>>> job.quota_names = () >>> job is queue.put(job) True >>> job.quota_names = ('content catalog',) Traceback (most recent call last): ... ValueError: ('unknown quota name', 'content catalog') >>> job.quota_names ()
>>> list(queue.quotas) [] >>> queue.quotas.create('testing') >>> list(queue.quotas) ['testing'] >>> queue.quotas.remove('testing') >>> list(queue.quotas) []
>>> queue.quotas.create('content catalog') >>> job.quota_names = ('content catalog',) >>> queue.quotas.remove('content catalog') >>> job.quota_names ('content catalog',) >>> job is queue.claim() True >>> len(queue) 0
>>> queue.quotas.create('content catalog') >>> queue.quotas['content catalog'].size 1
>>> queue.quotas['content catalog'].size = 2 >>> queue.quotas['content catalog'].size 2 >>> queue.quotas.create('frobnitz account', size=3) >>> queue.quotas['frobnitz account'].size 3
>>> job1 = zc.async.job.Job(operator.mul, 5, 2) >>> job2 = zc.async.job.Job(operator.mul, 5, 2) >>> job3 = zc.async.job.Job(operator.mul, 5, 2) >>> job1.quota_names = job2.quota_names = job3.quota_names = ( ... 'content catalog',) >>> job1 is queue.put(job1) True >>> job2 is queue.put(job2) True >>> job3 is queue.put(job3) True >>> job1 is queue.claim() True >>> job2 is queue.claim() True >>> print queue.claim() None >>> quota = queue.quotas['content catalog'] >>> len(quota) 2 >>> list(quota) == [job1, job2] True >>> quota.filled True >>> quota.size = 1 >>> quota.filled True >>> print queue.claim() None >>> job1() 10 >>> print queue.claim() None >>> len(quota) 1 >>> list(quota) == [job2] True >>> job2() 10 >>> job3 is queue.claim() True >>> list(quota) == [job3] True >>> len(quota) 1 >>> job3() 10 >>> print queue.claim() None >>> len(queue) 0 >>> quota.clean() >>> len(quota) 0 >>> quota.filled False
We don’t want the live dispatcher for our demos, actually. See dispatcher.txt to see the live dispatcher actually in use. So, here we’ll stop the “real” reactor and switch to a testing one.
>>> reactor.callFromThread(reactor.stop) >>> thread.join(3) >>> assert not dispatcher.activated, 'dispatcher did not deactivate'
>>> import zc.async.testing >>> reactor = zc.async.testing.Reactor() >>> dispatcher.reactor = reactor >>> dispatcher.activate() >>> reactor.start()
Configuration with Zope 3
Our last main section can be the shortest yet, both because we’ve already introduced all of the main concepts, and because we will be leveraging conveniences to automate much of the configuration shown in the section discussing configuration without Zope 3.
Client Set Up
If you want to set up a client alone, without a dispatcher, include the egg in your setup.py, include the configure.zcml in your applications zcml, make sure you share the database in which the queues will be held, and make sure that either the zope.app.keyreference.persistent.connectionOfPersistent adapter is registered, or zc.twist.connection.
That should be it.
Client/Server Set Up
For a client/server combination, use zcml that is something like the basic_dispatcher_policy.zcml, make sure you have access to the database with the queues, configure logging and monitoring as desired, configure the ZC_ASYNC_UUID environmental variable in zdaemon.conf if you are in production, and start up! Getting started is really pretty easy. You can even start a dispatcher-only version by not starting any servers in zcml.
In comparison to the non-Zope 3 usage, an important difference in your setup.py is that, if you want the full set up described below, including zc.z3monitor, you’ll need to specify “zc.async [z3]” as the desired package in your install_requires, as opposed to just “zc.async” [24].
We’ll look at this by making a zope.conf-alike and a site.zcml-alike. We’ll need a place to put some files, so we’ll use a temporary directory. This, and the comments in the files that we set up, are the primary differences between our examples and a real set up.
We’ll do this in two versions. The first version uses a single database, as you might do to get started quickly, or for a small site. The second version has one database for the main application, and one database for the async data, as will be more appropriate for typical production usage.
Two Database Set Up
Even though it is a bit more trouble to set up, large-scale production usage will probably prefer to use this approach, over the shared single database described above.
For our zope.conf, we only need one additional stanza to the one seen above:
<zodb async> <filestorage> create true path REPLACE_THIS_WITH_PATH_TO_STORAGE </filestorage> </zodb>
(You would replace “REPLACE_THIS_WITH_PATH_TO_STORAGE” with the path to the storage file.)
As before, you will probably prefer to use ZEO rather than FileStorage in production.
The zdaemon.conf instructions are the same: set the ZC_ASYNC_UUID environment variable properly in the zdaemon.conf file.
For our site.zcml, the only difference is that we use the multidb_dispatcher_policy.zcml file rather than the basic_dispatcher_policy.zcml file.
If you want to change policy, change “multidb_dispatcher_policy.zcml” to “dispatcher.zcml” in the example above and register your replacement bits for the policy in “multidb_dispatcher_policy.zcml”. You’ll see that most of that comes from code in subscribers.py, which can be adjusted easily.
If we process the files described above, and wait for a poll, we’ve got a working set up [27].
>>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get() >>> import pprint >>> pprint.pprint(get_poll(dispatcher, 0)) {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [], 'size': 3}}} >>> bool(dispatcher.activated) True
As before, we can ask for a job to be performed, and get the result.
>>> conn = db.open() >>> root = conn.root() >>> import zc.async.interfaces >>> queue = zc.async.interfaces.IQueue(root) >>> import operator >>> import zc.async.job >>> job = queue.put(zc.async.job.Job(operator.mul, 21, 2)) >>> import transaction >>> transaction.commit() >>> wait_for_result(job) 42
Hopefully zc.async will be an easy-to-configure, easy-to-use, and useful tool for you! Good luck! [28]
>>> import errno, os, random, socket, tempfile >>> dir = tempfile.mkdtemp() >>> site_zcml_file = os.path.join(dir, 'site.zcml')
>>> s = socket.socket() >>> for i in range(20): ... monitor_port = random.randint(20000, 49151) ... try: ... s.bind(('127.0.0.1', monitor_port)) ... except socket.error, e: ... if e.args[0] == errno.EADDRINUSE: ... pass ... else: ... raise ... else: ... s.close() ... break ... else: ... assert False, 'could not find available port' ... monitor_port = None ...
>>> zope_conf = """ ... site-definition %(site_zcml_file)s ... ... <zodb main> ... <filestorage> ... create true ... path %(main_storage_path)s ... </filestorage> ... </zodb> ... ... <zodb async> ... <filestorage> ... create true ... path %(async_storage_path)s ... </filestorage> ... </zodb> ... ... <product-config zc.z3monitor> ... port %(monitor_port)s ... </product-config> ... ... <logger> ... level debug ... name zc.async ... propagate no ... ... <logfile> ... path %(async_event_log)s ... </logfile> ... </logger> ... ... <logger> ... level debug ... name zc.async.trace ... propagate no ... ... <logfile> ... path %(async_trace_log)s ... </logfile> ... </logger> ... ... <eventlog> ... <logfile> ... formatter zope.exceptions.log.Formatter ... path STDOUT ... </logfile> ... <logfile> ... formatter zope.exceptions.log.Formatter ... path %(event_log)s ... </logfile> ... </eventlog> ... """ % {'site_zcml_file': site_zcml_file, ... 'main_storage_path': os.path.join(dir, 'main.fs'), ... 'async_storage_path': os.path.join(dir, 'async.fs'), ... 'monitor_port': monitor_port, ... 'event_log': os.path.join(dir, 'z3.log'), ... 'async_event_log': os.path.join(dir, 'async.log'), ... 'async_trace_log': os.path.join(dir, 'async_trace.log'),} ...
>>> os.environ['ZC_ASYNC_UUID'] = os.path.join(dir, 'uuid.txt')
>>> site_zcml = """ ... <configure xmlns='http://namespaces.zope.org/zope' ... xmlns:meta="http://namespaces.zope.org/meta" ... > ... <include package="zope.component" file="meta.zcml" /> ... <include package="zope.component" /> ... <include package="zc.z3monitor" /> ... <include package="zc.async" file="multidb_dispatcher_policy.zcml" /> ... ... <!-- this is usually handled in Zope applications by the ... zope.app.keyreference.persistent.connectionOfPersistent adapter --> ... <adapter factory="zc.twist.connection" /> ... </configure> ... """
>>> zope_conf_file = os.path.join(dir, 'zope.conf') >>> f = open(zope_conf_file, 'w') >>> f.write(zope_conf) >>> f.close() >>> f = open(site_zcml_file, 'w') >>> f.write(site_zcml) >>> f.close()
>>> import zdaemon.zdoptions >>> import zope.app.appsetup >>> options = zdaemon.zdoptions.ZDOptions() >>> options.schemadir = os.path.join( ... os.path.dirname(os.path.abspath(zope.app.appsetup.__file__)), ... 'schema') >>> options.realize(['-C', zope_conf_file]) >>> config = options.configroot
>>> import zope.app.appsetup.product >>> zope.app.appsetup.product.setProductConfigurations( ... config.product_config) >>> ignore = zope.app.appsetup.config(config.site_definition) >>> import zope.app.appsetup.appsetup >>> db = zope.app.appsetup.appsetup.multi_database(config.databases)[0][0]
>>> import zope.event >>> import zc.async.interfaces >>> zope.event.notify(zc.async.interfaces.DatabaseOpened(db))
>>> from zc.async.testing import get_poll, wait_for_result
>>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get() >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop) >>> dispatcher.thread.join(3)
>>> db.close() >>> db.databases['async'].close() >>> import shutil >>> shutil.rmtree(dir)
Tips and Tricks
General Tips and Tricks
Avoid long transactions if possible. Really try to avoid long transactions involving frequently written objects. One possible strategy is to divide up your code into a job for low-conflict tasks and one or more jobs for high-conflict tasks, perhaps created in a callback.
Sometimes you can’t avoid long transactions. But really try to avoid long commits. Commits hold a lock on the ZODB, and if you end up writing so much in a single transaction that you take noticeable time to write, realize that you are affecting–postponing–every single subsequent commit to the database.
Callbacks should be quick and reliable. If you want to do something that might take a while, put another job in the queue.
Some tasks are non-transactional. If you want to do them in a Job, you don’t want them to be retried! Use the NeverRetry retry policy for these, as described in the Recovering from Catastrophes section below.
zc.async works fine with both Python 2.4 and Python 2.5. Note that building Twisted with Python 2.4 generates a SyntaxError in a test, but as of this writing Twisted 8.1.0 is supported for Python 2.4.
Testing Tips and Tricks
In tests, don’t check to see if poll is activated until after the first poll. Try zc.async.testing.get_poll(zc.async.dispatcher.get(), 0), for instance.
In tests, be aware that DemoStorage does not support mvcc and does not support conflict resolution, so you may experience ConflictError (write and particularly read) problems with it that you will not experience as much, or at all, with a storage that supports those features such as FileStorage. Notice that all of the tests in this package use FileStorage.
If you get a failure as a result and you didn’t expect it, don’t forget the getTraceback and printTraceback methods on the failure. The whole point of the failure is to help you diagnose problems.
zc.async.dispatcher.get() will get you the dispatcher. You can then check if it is activated and also use the other introspection and status methods.
The zc.async.testing module has a number of helpful functions for testing. get_poll, given a dispatcher, will give you the next poll. This is a good way to make sure that a job you just put in has had a chance to be claimed by a dispatcher. It’s also a reasonable way to verify that the dispatcher has started. Other useful testing functions are zc.async.testing.wait_for_result, which waits for the result on a give job and returns it; and zc.async.testing.wait_for_annotation, which waits for a given annotation on a given job. These are demonstrated in various doctests in this package, but should also be reasonably simple and self-explanatory.
Recovering from Catastrophes
What Might Go Wrong?
Sometimes bad things happen in the course of processing tasks. What might go wrong? How does zc.async handle these errors? What are your responsibilities?
First, what might go wrong?
zc.async could have a problem while polling for jobs. We’ll call this a “polling exception.”
zc.async could have a problem while performing a particular job. We’ll call this a “job-related exception.”
For the purpose of this discussion, we will omit the possibility that zc.async has a bug. That is certainly a possibility, but the recovery story is not predictable, and if we knew of a bug, we’d try to fix it, rather than discuss it here!
We’ll discuss both polling exceptions and job related exceptions, then drill down into some specific scenarios. This will illuminate how your code and zc.async’s can work together to handle them.
Polling Exceptions
Polling exceptions are, at least in theory, the least of your worries. You shouldn’t have to worry about them; and if you do, it is probably a basic configuration problem that you need to address, such as making sure that the dispatcher process has access to the needed databases and software; or making sure that the dispatcher process is run by a daemonizing software that will restart if needed, such as zdaemon (http://pypi.python.org/pypi/zdaemon) or supervisor (http://supervisord.org/).
zc.async is largely responsible for dealing with polling exceptions. What does it have to handle?
The process running the poll ends, perhaps in the middle of a poll.
zc.async cannot commit a transaction during the poll, for instance because of a ConflictError, or because the database is unavailable.
What needs to happen to handle these problems?
Process Ends while Polling
If the process ends, your daemonizing front-end (zdaemon, supervisor, etc.) needs to restart it. The ZODB will discard incomplete transaction data, if any.
The only thing a zc.async dispatcher needs to handle is clean up.
Ideally it will be able to deactivate its record in the ZODB during the process shutdown.
Instead, if it was a “hard crash” that didn’t allow deactivation, a sibling dispatcher will realize that the dispatcher is down and deactivate it.
Or, finally, if it was a hard crash without a sibling, and the daemon restarts a process for the original dispatcher instance, the new process needs to realize that the old process is dead, not competing with it.
Transaction Error while Polling
If the poll gets a conflict error, it should simply abort and retry the poll, forever, with a small back-off.
If the database goes away (perhaps the ZEO server goes down for a bit, and the ZEO client to which the dispatcher is connected is trying to reconnect) it should gracefully try to wait for the database to return, and resume when it does.
Other, more dramatic errors, such as POSKey errors, are generally considered to be out of zc.async’s domain and control. It should ideally continue to try to resume as long as the process is alive, in case somehow the situation improves, but this may be difficult and the expectations for zc.async’s recovery are lower than with ConflictErrors and ClientDisconnected errors.
Summary of Polling Exceptions
To repeat, then, polling exceptions have two basic scenarios.
If a dispatcher process ends, it needs to deactivate its record in the ZODB, or let another process know to deactivate it.
If a ZODB.POSException.ConflictError occurs, retry forever with a small backoff; or if ZEO.Exceptions.ClientDisconnected occurs, retry forever with a small backoff, waiting for the database to come back.
Most anything else will ideally keep zc.async attempting to re-poll, but it may not happen: expectations are lower.
Your Responsibilities
As the author of a zc.async job, your responsibilities, then, are to handle your own exceptions; and to make sure that the retry policy for each job is appropriate. This is controlled with an IRetryPolicy, as shown below.
As someone configuring a running dispatcher, you need to make sure that you give the dispatcher the necessary access to databases and software to perform your jobs, and you need to review (and rotate!) your logs.
zc.async’s Responsibilities
zc.async needs to have polling robust in the face of restarts, ConflictErrors and ClientDisconnected errors. It needs to give your code a chance to decide what to do in these circumstances, and log your errors.
Retry Policies
The rest of the document uses scenarios to illustrate how zc.async handles errors, and how you might want to configure retry policies.
What is a retry policy? It is used in three circumstances.
When the job starts but fails to complete because the system is interrupted, the job will try to call retry_policy.interrupted() to get a boolean as to whether the job should be retried.
When the code the job ran fails, the job will try to call retry_policy.jobError(failure, data_cache) to get a boolean as to whether the job should be retried.
When the commit fails, the job will try to call retry_policy.commitError(failure, data_cache) to get a boolean as to whether the job should be retried.
Why does this need to be a policy? Can’t it be a simpler arrangement?
The heart of the problem is that different jobs need different error resolutions.
In some cases, jobs may not be fully transactional. For instance, the job may be communicating with an external system, such as a credit card system. The retry policy here should typically be “never”: perhaps a callback should be in charge of determining what to do next.
If a job is fully transactional, it can be retried. But even then the desired behavior may differ.
In typical cases, some errors should simply cause a failure, while other errors, such as database conflict errors, should cause a limited number of retries.
In some jobs, conflict errors should be retried forever, because the job must be run to completion or else the system should fall over. Callbacks that try to handle errors themselves may take this approach, for instance.
zc.async currently ships with three retry policies.
The default, appropriate for most fully transactional jobs, is the zc.async.job.RetryCommonFourTimes. This retries ZEO disconnects forever; and interrupts and transaction errors such as conflicts a set number of times.
The other available (pre-written) option for transactional jobs is zc.async.job.RetryCommonForever. Callbacks will get this policy by default. This retries ZEO disconnects, transaction errors such as conflict errors, interrupts, and anything that happens during the job’s commit, forever.
The last retry policy is zc.async.job.NeverRetry. This is appropriate for non-transactional jobs. You’ll still typically need to handle errors in your callbacks.
If you look at these, you will see that it is trivial to write your own, if desired.
Scenarios
We’ll examine polling error scenarios and job error scenarios.
Polling errors
The system is polling and gets a ConflictError.
The system is polling and gets a ClientDisconnected error.
Job errors
A worker process is working on a job with the default retry policy. The process dies gracefully and restarts.
Like the previous scenario, a worker process is working on a job with the default retry policy. The process crashes hard (does not die gracefully) and restarts.
Like the previous scenario, a worker process is working on a job with the default retry policy. The process crashes hard (does not die gracefully) and a sibling notices and takes over.
A worker process is working on a job with the default retry policy and gets an error during the job or the commit.
Scenarios: Polling Errors
ConflictError
A common place for a conflict error is with two dispatchers trying to claim the same job from the queue. This example will mimic that situation.
Imagine we have a full set up with a dispatcher, agent, and queue. [29] We’ll actually replace the agent’s chooser with one that behaves badly: it blocks, waiting for our lock.
>>> import threading >>> lock1 = threading.Lock() >>> lock2 = threading.Lock() >>> lock1.acquire() True >>> lock2.acquire() True >>> def acquireLockAndChooseFirst(agent): ... res = agent.queue.claim() ... if res is not None: ... lock2.release() ... lock1.acquire() ... return res ... >>> import zc.async.instanceuuid >>> import zc.async.interfaces >>> import zc.async.testing >>> import zc.async.dispatcher >>> import pprint >>> dispatcher = zc.async.dispatcher.get() >>> pprint.pprint(zc.async.testing.get_poll(dispatcher, 0)) {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [], 'size': 3}}} >>> import transaction >>> _ = transaction.begin() >>> queues = root[zc.async.interfaces.KEY] >>> queue = queues[''] >>> da = queue.dispatchers[zc.async.instanceuuid.UUID] >>> agent = da['main'] >>> agent.chooser = acquireLockAndChooseFirst >>> def returnSomething(): ... return 42 ... >>> job = queue.put(returnSomething) >>> transaction.commit()
Now, when the agent tries to get our job, we’ll start and commit another transaction that removes it from the queue. This will generate a conflict error for the poll’s thread and transaction, because it cannot also remove the same job.
>>> lock2.acquire() True >>> _ = transaction.begin() >>> job is queue.pull() True >>> transaction.commit() >>> lock1.release()
However, the ConflictError is handled, and polling continues.
>>> _ = transaction.begin() >>> import zc.async.agent >>> agent.chooser = zc.async.agent.chooseFirst >>> transaction.commit() >>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get() >>> import zc.async.testing >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [], 'size': 3}}}
And if we put the job back, it will be performed.
>>> job is queue.put(job) True >>> transaction.commit() >>> zc.async.testing.wait_for_result(job) 42
Client Disconnected
The story is very similar if the ZEO connection goes away for a while. We’ll mimic a ZEO ClientDisconnected error by monkeypatching transaction.TranasctionManager.commit.
>>> lock1.locked() True >>> lock2.locked() True>>> agent.chooser = acquireLockAndChooseFirst >>> job = queue.put(returnSomething) >>> transaction.commit()>>> lock2.acquire() True >>> import ZEO.Exceptions >>> def commit(self): ... raise ZEO.Exceptions.ClientDisconnected() ... >>> import transaction >>> old_commit = transaction.TransactionManager.commit >>> transaction.TransactionManager.commit = commit >>> import time >>> sleep_requests = [] >>> def sleep(i): ... sleep_requests.append(i) ... >>> old_sleep = time.sleep >>> time.sleep = sleep >>> agent.chooser = zc.async.agent.chooseFirst >>> transaction.commit() >>> lock1.release() >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [(..., 'unnamed')], 'size': 3}}} >>> transaction.TransactionManager.commit = old_commit >>> zc.async.testing.wait_for_result(job) 42 >>> bool(sleep_requests) True
Here’s another variant that mimics being unable to read the storage during a poll, and then recuperating.
>>> error_raised = False >>> def raiseDisconnectedThenChooseFirst(agent): ... global error_raised ... if not error_raised: ... error_raised = True ... raise ZEO.Exceptions.ClientDisconnected() ... return agent.queue.claim() >>> agent.chooser = raiseDisconnectedThenChooseFirst >>> def returnSomething(): ... return 42 ... >>> job = queue.put(returnSomething) >>> transaction.commit() >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) # doctest: +ELLIPSIS {'': {'main': {'active jobs': [], 'error': <zc.twist.Failure ...ClientDisconnected>, 'len': 0, 'new jobs': [], 'size': 3}}} >>> zc.async.testing.wait_for_result(job) 42
Zope 3 General Tips and Tricks
If you use Zope 3, sometimes you want async jobs that have local sites and security set up. zc.async.z3.Job is a subclass of the main zc.async.job.Job implementation that leverages the setUp and tearDown hooks to accomplish this.
It takes the site and the ids of the principals in the security context at its instantiation. The values can be mutated. Then when the job runs it sets the context up for the job’s code, and then tears it down after the work has been committed (or aborted, if there was a failure). This can be very convenient for jobs that care about site-based component registries, or that care about the participants in zope.security interactions.
This is different than a try: finally: wrapper around your main code that does the work, both because it is handled for you transparently, and because the context is cleaned up after the job’s main transaction is committed. This means that code that expects a security or site context during a pre-transaction hook will be satisfied.
For instance, let’s imagine we have a database, and we establish a local site and an interaction with a request. [33] [34] Unfortunately, this is a lot of set up. [35]
>>> import zope.app.component.hooks >>> zope.app.component.hooks.setSite(site) >>> import zope.security.management >>> import zc.async.z3 >>> zope.security.management.newInteraction( ... zc.async.z3.Participation(mickey)) # usually would be a request
Now we create a new job.
>>> def reportOnContext(): ... print (zope.app.component.hooks.getSite().__class__.__name__, ... tuple(p.principal.id for p in ... zope.security.management.getInteraction().participations)) >>> j = root['j'] = zc.async.z3.Job(reportOnContext)
The ids of the principals in the participations in the current interaction are in a participants tuple. The site is on the job’s site attribute.
>>> j.participants ('mickey',) >>> j.site is site True
If we end the interaction, clear the local site, and run the job, the job we used (reportOnContext above) shows that the context was correctly in place.
>>> zope.security.management.endInteraction() >>> zope.app.component.hooks.setSite(None) >>> transaction.commit() >>> j() ('StubSite', ('mickey',))
However, now the site and interaction are empty.
>>> print zope.security.management.queryInteraction() None >>> print zope.app.component.hooks.getSite() None
As mentioned, the context will be maintained through the transaction’s commit. Let’s illustrate.
>>> import zc.async >>> import transaction.interfaces >>> def setTransactionHook(): ... t = transaction.interfaces.ITransactionManager(j).get() ... t.addBeforeCommitHook(reportOnContext) ... >>> zope.app.component.hooks.setSite(site) >>> zope.security.management.newInteraction( ... zc.async.z3.Participation(mickey), zc.async.z3.Participation(jack), ... zc.async.z3.Participation(foo)) # >1 == rare but possible scenario >>> j = root['j'] = zc.async.z3.Job(setTransactionHook) >>> j.participants ('mickey', 'jack', 'foo') >>> j.site is site True>>> zope.security.management.endInteraction() >>> zope.app.component.hooks.setSite(None) >>> transaction.commit() >>> j() ('StubSite', ('mickey', 'jack', 'foo'))>>> print zope.security.management.queryInteraction() None >>> print zope.app.component.hooks.getSite() None
>>> from ZODB.tests.util import DB >>> db = DB() >>> conn = db.open() >>> root = conn.root()
>>> import zc.async.configure >>> zc.async.configure.base()
>>> import zc.async.testing >>> zc.async.testing.setUpDatetime() # pins datetimes
Without a site or an interaction, you can still instantiate and run the job normally.
>>> import zc.async.z3 >>> import operator >>> j = root['j'] = zc.async.z3.Job(operator.mul, 6, 7) >>> j.participants () >>> print j.site None >>> import transaction >>> transaction.commit() >>> j() 42
To do this, we need to set up the zope.app.component hooks, create a site, set up an authentication utility, and create some principals that the authentication utility can return.
>>> import zope.app.component.hooks >>> zope.app.component.hooks.setHooks()
>>> import zope.app.component.site >>> import persistent >>> class StubSite(persistent.Persistent, ... zope.app.component.site.SiteManagerContainer): ... pass >>> site = root['site'] = StubSite() >>> sm = zope.app.component.site.LocalSiteManager(site) >>> site.setSiteManager(sm)
>>> import zope.security.interfaces >>> import zope.app.security.interfaces >>> import zope.interface >>> import zope.location >>> class StubPrincipal(object): ... zope.interface.implements(zope.security.interfaces.IPrincipal) ... def __init__(self, identifier, title, description=''): ... self.id = identifier ... self.title = title ... self.description = description ... >>> class StubPersistentAuth(persistent.Persistent, ... zope.location.Location): ... zope.interface.implements( ... zope.app.security.interfaces.IAuthentication) ... _mapping = {'foo': 'Foo Fighter', ... 'jack': 'Jack, Giant Killer', ... 'mickey': 'Mickey Mouse'} ... def getPrincipal(self, principal_id): ... return StubPrincipal(principal_id, self._mapping[principal_id]) ... >>> auth = StubPersistentAuth() >>> sm.registerUtility(auth, zope.app.security.interfaces.IAuthentication) >>> transaction.commit() >>> mickey = auth.getPrincipal('mickey') >>> jack = auth.getPrincipal('jack') >>> foo = auth.getPrincipal('foo')
Zope 3 Testing Tips and Tricks
Summary
Make sure you are using zope.app.testing version 3.4.2 or newer, or else ftests may intermittently raise spurious errors having to do with a missing _result attribute on a request’s response.
The Zope 3 tests use DemoStorage, which does not use MVCC. This can lead to your tests having occasional ConflictErrors that will not occur in production. In common cases for ftests, you won’t notice these because of Zope’s usual retry policy. Unit or integration tests may show these problems.
Set up the basic configuration in zcml or Python (see examples below), but you need to make sure that ftests do not use dispatchers started in the application. Start up ftest (or integration test) dispatchers separately, using zc.async.ftesting.setUp, and then tear down after the tests are done with zc.async.ftesting.tearDown.
The ftest dispatcher polls every tenth of a second, so you shouldn’t need to wait long for you job to get started in your tests.
General zc.async testing tools such as zc.async.dispatcher.get, zc.async.testing.get_poll and zc.async.testing.wait_for_result can still be useful for in-depth zc.async tests.
If you don’t want to dig into guts in your functional tests to use the tools described in the previous point, consider making a view to check on job status using a data structure like JSON, and looking at that in your tests.
The setUp code by default sends critical log messages to __stdout__ so it can help diagnose why a callback might never complete.
Discussion
Normally, in a Zope 3 configuration that uses zc.async, you configure it when you start your application. For instance, you might include a zc.async zcml file like basic_dispatcher_policy.zcml that performs the necessary set up.
However, the Zope 3 ftesting layer database dance doesn’t play well with zc.async unless you take a bit of extra care.
This is because zc.async will be started with the ftests’ underlying database, and then the test will be run with a DemoStorage wrapper. The zc.async dispatcher will run, then, but it will never see the changes that you make in the wrapper DemoStorage that your test manipulates. This can be mystifying and frustrating.
Because of this, when you write a Zope 3 app that wants to use both layered ftests and zc.async, you have to set things up in a mildly inconvenient way.
When you start your application normally, use configuration (zcml or grok or whatever) to register subscribers like the ones in subscribers.py: adding queues, starting dispatchers, and adding agents.
But don’t have this configuration registered for your ftests. Instead, bypass that part of your site’s configuration in your ftesting layer, and use the zc.async.ftesting.setUp function to set zc.async up in tests when you need it, in a footnote of your test or in a similar spot.
You’ll still want the basic adapters registered, as found in zc.async’s configure.zcml or configure.py files; and maybe the zc.async.queue.getDefaultQueue adapter too. This can be registered in ftesting.zcml with this snippet:
<include package="zc.async" /> <adapter factory="zc.async.queue.getDefaultQueue" />
Or in Python, you might want to do something like this:
>>> import zc.async.configure >>> zc.async.configure.base() # or, more likely, ``minimal`` for Zope 3 >>> import zope.component >>> import zc.async.queue >>> zope.component.provideAdapter(zc.async.queue.getDefaultQueue)
Don’t forget to call tearDown (see below) at the end of your test!
Here’s a usage example.
As mentioned above, setUp does expect the necessary basic adapters to already be installed.
Zope 3 ftests generally have a getRootObject hanging around to give you the root object in the Zope application (but not in the ZODB). Therefore, this function tries to be helpful, for better and worse, and muck around in the locals to find it. If you want it to leave your locals alone, pass it a database connection.
So, here’s some set up. We create a database and make our stub getRootFolder function in the globals.
>>> import transaction >>> import BTrees >>> import ZODB.FileStorage >>> storage = ZODB.FileStorage.FileStorage( ... 'zc_async.fs', create=True) >>> from ZODB.DB import DB >>> db = DB(storage) >>> conn = db.open() >>> root = conn.root() >>> PseudoZopeRoot = root['Application'] = BTrees.family32.OO.BTree() >>> transaction.commit() >>> def _getRootObject(): ... return PseudoZopeRoot ... >>> globals()['getRootFolder'] = _getRootObject
Notice we are using a real FileStorage, and not a DemoStorage, as is usually used in ftests. The fact that DemoStorage does not have MVCC can sometimes lead standard ftests to raise spurious ReadConflictErrors that will not actually occur in production. The ConflictErrors will generally be retried, so your tests should usually pass, even though you might see some “complaints”.
Now we can call setUp as if we were in a functional test.
>>> import zc.async.ftesting >>> zc.async.ftesting.setUp()
Now the dispatcher is activated and the polls are running. The function sets up a dispatcher that polls much more frequently than usual–every 0.1 seconds rather than every 5, so that tests might run faster–but otherwise uses typical zc.async default values.
It’s worth noting a few tricks that are particularly useful for tests here. We’ll also use a couple of them to verify that setUp did its work.
zc.async.dispatcher.get() returns the currently installed dispatcher. This can let you check if it is activated and polling and use its simple statistical methods, if you want.
>>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get()
For now, we’ll just see that the dispatcher is activated.
>>> bool(dispatcher.activated) True
See the dispatcher.txt for information on information you can get from the dispatcher object.
zc.async.testing has a number of helpful functions for testing. get_poll is the most pertinent here: given a dispatcher, it will give you the next poll. This is a good way to make sure that a job you just put in has had a chance to be claimed by a dispatcher. It’s also a reasonable way to verify that the dispatcher has started. setUp already gets the first two polls, so it’s definitely all started.
>>> import zc.async.testing >>> import pprint >>> pprint.pprint(zc.async.testing.get_poll(dispatcher)) {'': {'main': {'active jobs': [], 'error': None, 'len': 0, 'new jobs': [], 'size': 3}}}
Other useful testing functions are zc.async.testing.wait_for_result, which waits for the result on a give job and returns it; and zc.async.testing.wait_for_annotation, which waits for a given annotation on a given job. These are demonstrated in various doctests in this package, but should also be reasonably simple and self-explanatory.
Callbacks will retry some errors forever, by default. The logic is that callbacks are often the “cleanup” and must be run. This can lead to confusion in debugging tests, though, because the retry warnings are sent to the log, and the log is not usually monitored in functional tests.
setUp tries to help with this by adding logging of CRITICAL log messages in the “zc.async” logger to stdout.
>>> import logging >>> logging.getLogger('zc.async.event').critical('Foo!') Foo! >>> logging.getLogger('zc.async.event').error('Bar!')
Once you have finished your tests, make sure to shut down your dispatcher, or the testing framework will complain about an unstopped daemon thread. zc.async.ftesting.tearDown will do the trick.
>>> zc.async.ftesting.tearDown() >>> dispatcher.activated False
You can then start another async-enabled functional test up again later in the same layer, of course.
>>> zc.async.ftesting.setUp() >>> dispatcher = zc.async.dispatcher.get() >>> bool(dispatcher.activated) True>>> zc.async.ftesting.tearDown() >>> dispatcher.activated False
Also worth noting, as mentioned in the summary, is that you should use zope.app.testing version 3.4.2 or higher to avoid getting spurious, intermittent bug reports from ftests that use zc.async.
In your test or your test’s tearDown, if you used a FileStorage, as we did here, you’ll need to clean up as well. We normally do this in our tests’ tearDowns, but we do it here, now, to make the point.
>>> db.close() >>> storage.close() >>> storage.cleanup()>>> del storage # we just do this to not confuse our own tearDown code. >>> del globals()['getRootFolder'] # clean up globals
Changes
1.4 (2008-07-30)
Mentioned in ftesting.txt that Zope 3 users should uses zope.app.testing 3.4.2 or newer. Also added a summary section at the beginning of that file.
Added logging of critical messages to __stdout__ for ftesting.setUp. This can help discovering problems in callback transactions. This uses a new helper function , print_logs, in zc.async.testing, which is primarily intended to be used for quick and dirty debugging
Changed testing.wait_for_result and testing.wait_for_annotation to ignore ReadConflictErrors, so they can be used more reliably in tests that use MappingStorage, and other storages without MVCC.
Support <type ‘builtin_function_or_method’> for adaptation to Job.
Add warning about long commits to tips and tricks.
After complaining about a polling dispatcher that is deactivated not really being dead in the logs, reactivate.
No longer use intermediate job to implement the success/failure addCallbacks behavior. Introduce an ICallbackProxy that can be used for this kind of behavior instead. This change was driven by two desires.
Don’t log the intermediate result. It makes logs harder to read with unnecessary duplications of pertinent data hidden within unimportant differences in the log entries.
Don’t unnecessarily remember errors in success/failure callbacks. This can cause unnecessary failures in unusual situations.
The callback proxy accepts callbacks, which are added to the selected job (success or failure) when the job is selected.
This change introduces some hopefully trivial incompatibilities, which basically come down to the callback being a proxy, not a real job. Use the convenience properties success and failure on the proxy to look at the respective jobs. After the proxy is evaluated, the job attribute will hold the job that was actually run. status and result are conveniences to get the status and result of the selected job.
Add parallel and serial convenience functions to zc.async.job to make it trivial to schedule and process decomposed jobs.
Add start convenience function to zc.async.configure to make it trivial to start up a common-case configuration of a zc.async dispatcher.
No longer use protected attributes of callbacks in resumeCallbacks.
The “local” code is now moved out from the dispatcher module to threadlocal. This is to recognize that the local code is now modified outside of the dispatcher module, as described in the next bullet.
Jobs, when called, are responsible for setting the “local” job value. This means that zc.async.local.getJob() always returns the currently running job, whether it is a top-level job (as before) or a callback (now).
Start on S5 QuickStart presentation (see QUICKSTART_1_VIRTUALENV.txt in package).
1.3 (2008-07-04)
added “Tips and Tricks” and incorporated into the PyPI page.
added setUp and tearDown hooks to Job class so that code can run before and after the main job’s code. The output of setUp is passed as an argument to tearDown so that one can pass state to the other, if needed. setUp is run immediately before the actual job call. tearDown runs after the transaction is committed, or after it was aborted if there was a failure. A retry requested by a retry policy causes the methods to be run again. A failure in setUp is considered to be a failure in the job, as far as the retryPolicy is concerned (i.e., the job calls the retry policy’s jobError method). If setUp fails, the job is not called, bit tearDown is. tearDown will fail with a critical log message, but then processing will continue.
using the new setUp and tearDown hooks, added a Zope 3-specific Job subclass (see zc.async.z3.Job) that remembers the zope.app.component site and interaction participants when instantiated. These can be mutated. Then, when the job is run, the setUp sets up the site and a security interaction with the old participants, and then the tearDown tears it all down after the transaction has committed.
changed retry policy logs to “WARNING” level, from “INFO” level.
changed many dispatcher errors to “CRITICAL” level from “ERROR” level.
added “CRITICAL” level logs for “other” commit retries on the RetryCommonForever retry policy.
added remove method on queue.
added helpers for setting up and tearing down Zope 3 functional tests (ftesting.py), and a discussion of how to write Zope 3 functional tests with layers (zope.app.testing.functional) in ftesting.txt.
remove obsolete retry approach for success/failure callbacks (completeStartedJobArguments): it is now handled by retry policies.
remove odd full-path self-references within the utils module.
renamed zc.async.utils.try_transaction_five_times to zc.async.utils.try_five_times.
doc improvements and fixes (thanks to Zvezdan Petkovic and Gintautas Miliauskas).
the z3 “extra” distutils target now explicitly depends on zope.security, zope.app.security, and zope.app.component. This almost certainly does not increase the practical dependencies of the z3 extras, but it does reflect new direct dependencies of the z3-specific modules in the package.
1.2 (2008-06-20)
made the log for finding an activated agent report the pertinent queue’s oid as an unpacked integer, rather than the packed string blob. Use ZODB.utils.p64 to convert back to an oid that the ZODB will recognize.
Bugfix: in failing a job, the job thought it was in its old agent, and the fail call failed. This is now tested by the first example in new doctest catastrophes.txt.
jobs no longer default to a begin_by value of one hour after the begin_after. The default now is no limit.
Made dispatcher much more robust to transaction errors and ZEO ClientDisconnected errors.
Jobs now use an IRetryPolicy to decide what to do on failure within a job, within the commit of the result, and if the job is interrupted. This allows support of transactional jobs, transactional jobs that critically must be run to completion, and non-transactional jobs such as communicating with an external service.
The default retry policy supports retries for ClientDisconnected errors, transaction errors, and interruptions.
job.txt has been expanded significantly to show error handling and the use of retry policies. New file catastrophes.txt shows handling of other catastrophes, such as interruptions to polling.
job errors now go in the main zc.async.event log rather than in the zc.async.trace log. Successes continue to go in the trace log.
callback failures go to the main log as a CRITICAL error, by default.
handleInterrupt is the new protocol on jobs to inform them that they were active in a dispatcher that is now dead. They either fail or reschedule, depending on the associated IRetryPolicy for the job. If they reschedule, this should either be a datetime or timedelta. The job calls the agent’s reschedule method. If the timedelta is empty or negative, or the datetime is earlier than now, the job is put back in the queue with a new putBack method on the queue. This is intended to be the opposite of claim. Jobs put in the queue with putBack will be pulled out before any others.
convert to using zope.minmax rather than locally defined Atom.
Fix (and simplify) last_ping code so as to reduce unnecessarily writing the state of the parent DispatcherAgents collection to the database whenever the atom changed.
Depends on new release of zc.twist (1.3)
Switched dispatcher’s in-memory storage of job and poll information to be per job or per poll, respectively, rather than per time period, so as to try and make memory usage more predictable (for instance, whether a dispatcher is whipping through lots of jobs quickly, or doing work more slowly).
1.1.1 (2008-05-14)
more README tweaks.
converted all reports from the dispatcher, including the monitor output, to use “unpacked” integer oids. This addresses a problem that simplejson was having in trying to interpret the packed string blobs as unicode, and then making zc.ngi fall over. To get the object, then, you’ll need to use ZODB.utils.p64, like this: connection.get(ZODB.utils.p64(INTEGER_OID)), where INTEGER_OID indicates the integer oid of the object you want to examine.
added several more tests for the monitor code.
made the async jobs monitor command be “up to the minute”. Before, it included all of the new and active jobs from the previous poll; now, it also filters out those that have since completed.
The async job command was broken, as revealed by a new monitor test. Fixed, which also means we need a new version of zope.bforest (1.2) for a new feature there.
1.1 (2008-04-24)
Fired events when the IQueues and IQueue objects are installed by the QueueInstaller (thanks to Fred Drake).
Dispatchers make agent threads keep their connections, so each connection’s object cache use is optimized if the agent regularly requests jobs with the same objects.
README improved (thanks to Benji York and Sebastian Ware).
Callbacks are logged at start in the trace log.
All job results (including callbacks) are logged, including verbose tracebacks if the callback generated a failure.
Had the ThreadedDispatcherInstaller subscriber stash the thread on the dispatcher, so you can shut down tests like this:
>>> import zc.async.dispatcher >>> dispatcher = zc.async.dispatcher.get() >>> dispatcher.reactor.callFromThread(dispatcher.reactor.stop) >>> dispatcher.thread.join(3)
Added getQueue to zc.async.local as a convenience (it does what you could already do: zc.async.local.getJob().queue).
Clarified that IQueue.pull is the approved way of removing scheduled jobs from a queue in interfaces and README.
reports in the logs of a job’s success or failure come before callbacks are started.
Added a section showing how the basic_dispatcher_policy.zcml worked, which then pushed the former README_3 examples into README_3b.
Put ZPL everywhere I was supposed to.
Moved a number of helpful testing functions out of footnotes and into zc.async.testing, both so that zc.async tests don’t have to redefine them and client packages can reuse them.
1.0 (2008-04-09)
Initial release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.