hurry.workflow is a simple workflow system. It can be used toimplement stateful multi-version workflows for Zope 3 applications.
Project description
A simple but quite nifty workflow system for Zope 3.
hurry.workflow changes
0.9.2.1 (2007-08-15)
Bug fixes
Oops, the patches in 0.9.2 were not actually applied. Fixed them now.
0.9.2 (2007-08-15)
Bug fixes
zope.security changes broke imports in hurry.workflow.
localUtility directive is now deprecated, so don’t use it anymore.
0.9.1 (2006-09-22)
Feature changes
first cheesehop release.
0.9 (2006-06-15)
Feature changes
separate out from hurry package into hurry.workflow
eggification work
Zope 3.3 compatibility work
0.8 (2006-05-01)
Feature changes
Initial public release.
Detailed Documentation
Hurry Workflow
The hurry workflow system is a “roll my own because I’m in a hurry” framework.
Basic workflow
Let’s first make a content object that can go into a workflow:
>>> from zope.interface import implements, Attribute >>> from zope.annotation.interfaces import IAttributeAnnotatable >>> class IDocument(IAttributeAnnotatable): ... title = Attribute('Title') >>> class Document(object): ... implements(IDocument) ... def __init__(self, title): ... self.title = title
As you can see, such a content object must provide IAnnotatable, as this is used to store the workflow state. The system uses the IWorkflowState adapter to get and set an object’s workflow state:
>>> from hurry.workflow import interfaces >>> document = Document('Foo') >>> state = interfaces.IWorkflowState(document) >>> print state.getState() None
The state can be set directly for an object using the IWorkflowState adapter as well:
>>> state.setState('foo') >>> state.getState() 'foo'
But let’s set it back to None again, so we can start again in a pristine state for this document:
>>> state.setState(None)
It’s not recommended use setState() do this ourselves, though: usually we’ll let the workflow system take care of state transitions and the setting of the initial state.
Now let’s define a simple workflow transition from ‘a’ to ‘b’. It needs a condition which must return True before the transition is allowed to occur:
>>> def NullCondition(wf, context): ... return True
and an action that takes place when the transition is taken:
>>> def NullAction(wf, context): ... pass
Now let’s construct a transition:
>>> from hurry.workflow import workflow >>> transition = workflow.Transition( ... transition_id='a_to_b', ... title='A to B', ... source='a', ... destination='b', ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL)
The transition trigger is either MANUAL, AUTOMATIC or SYSTEM. MANUAL indicates user action is needed to fire the transition. AUTOMATIC transitions fire automatically. SYSTEM is a workflow transition directly fired by the system, and not directly by the user.
We also will introduce an initial transition, that moves an object into the workflow (for instance just after it is created):
>>> init_transition = workflow.Transition( ... transition_id='to_a', ... title='Create A', ... source=None, ... destination='a')
And a final transition, when the object moves out of the workflow again (for instance just before it is deleted):
>>> final_transition = workflow.Transition( ... transition_id='finalize', ... title='Delete', ... source='b', ... destination=None)
Now let’s put the transitions in an workflow utility:
>>> wf = workflow.Workflow([transition, init_transition, final_transition]) >>> from zope.app.testing import ztapi >>> ztapi.provideUtility(interfaces.IWorkflow, wf)
Workflow transitions cause events to be fired; we will put in a simple handler so we can check whether things were successfully fired:
>>> events = [] >>> def transition_handler(event): ... events.append(event) >>> ztapi.subscribe([interfaces.IWorkflowTransitionEvent], None, ... transition_handler)
To get what transitions to other states are possible from an object, as well as to fire transitions and set initial state, we use the IWorkflowInfo adapter:
>>> info = interfaces.IWorkflowInfo(document)
We’ll initialize the workflow by firing the ‘to_a’ transition:
>>> info.fireTransition('to_a')
This should’ve fired an event:
>>> events[-1].transition.transition_id 'to_a' >>> events[-1].source is None True >>> events[-1].destination 'a'
There’s only a single transition defined to workflow state ‘b’:
>>> info.getManualTransitionIds() ['a_to_b']
We can also get this by asking which manual (or system) transition exists that brings us to the desired workflow state:
>>> info.getFireableTransitionIdsToward('b') ['a_to_b']
Since this is a manually triggered transition, we can fire this transition:
>>> info.fireTransition('a_to_b')
The workflow state should now be ‘b’:
>>> state.getState() 'b'
We check that the event indeed got fired:
>>> events[-1].transition.transition_id 'a_to_b' >>> events[-1].source 'a' >>> events[-1].destination 'b'
We will also try fireTransitionToward here, so we sneak back the workflow to state ‘a’ again and try that:
>>> state.setState('a')
Try going through a transition we cannot reach first:
>>> info.fireTransitionToward('c') Traceback (most recent call last): ... NoTransitionAvailableError
Now go to ‘b’ again:
>>> info.fireTransitionToward('b') >>> state.getState() 'b'
Finally, before forgetting about our document, we finalize the workflow:
>>> info.fireTransition('finalize') >>> state.getState() is None True
And we have another event that was fired:
>>> events[-1].transition.transition_id 'finalize' >>> events[-1].source 'b' >>> events[-1].destination is None True
Multi-version workflow
Now let’s go for a more complicated scenario where have multiple versions of a document. At any one time a document can have an UNPUBLISHED version and a PUBLISHED version. There can also be a CLOSED version and any number of ARCHIVED versions:
>>> UNPUBLISHED = 'unpublished' >>> PUBLISHED = 'published' >>> CLOSED = 'closed' >>> ARCHIVED = 'archived'
Let’s start with a simple initial transition:
>>> init_transition = workflow.Transition( ... transition_id='init', ... title='Initialize', ... source=None, ... destination=UNPUBLISHED)
When the unpublished version is published, any previously published version is made to be the CLOSED version. To accomplish this secondary state transition, we’ll use the system’s built-in versioning ability with the ‘fireTransitionsForVersions’ method, which can be used to fire transitions of other versions of the document:
>>> def PublishAction(wf, context): ... wf.fireTransitionForVersions(PUBLISHED, 'close')
Now let’s build the transition:
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=PublishAction, ... trigger=interfaces.MANUAL, ... order=1)
Next, we’ll define a transition from PUBLISHED to CLOSED, which means we want to archive whatever was closed before:
>>> def CloseAction(wf, context): ... wf.fireTransitionForVersions(CLOSED, 'archive') >>> close_transition = workflow.Transition( ... transition_id='close', ... title='Close', ... source=PUBLISHED, ... destination=CLOSED, ... condition=NullCondition, ... action=CloseAction, ... trigger=interfaces.MANUAL, ... order=2)
Note that CloseAction will also be executed automatically whenever state is transitioned from PUBLISHED to CLOSED using fireTransitionsForVersions. This means that publishing a document results in the previously closed document being archived.
If there is a PUBLISHED but no UNPUBLISHED version, we can make a new copy of the PUBLISHED version and make that the UNPUBLISHED version:
>>> def CanCopyCondition(wf, context): ... return not wf.hasVersion(UNPUBLISHED)
Since we are actually creating a new content object, the action should return the newly created object with the new state:
>>> def CopyAction(wf, context): ... return Document('copy of %s' % context.title) >>> copy_transition = workflow.Transition( ... transition_id='copy', ... title='Copy', ... source=PUBLISHED, ... destination=UNPUBLISHED, ... condition=CanCopyCondition, ... action=CopyAction, ... trigger=interfaces.MANUAL, ... order=3)
A very similar transition applies to the closed version. If we have no UNPUBLISHED version and no PUBLISHED version, we can make a new copy from the CLOSED version:
>>> def CanCopyCondition(wf, context): ... return (not wf.hasVersion(UNPUBLISHED) and ... not wf.hasVersion(PUBLISHED)) >>> copy_closed_transition = workflow.Transition( ... transition_id='copy_closed', ... title='Copy', ... source=CLOSED, ... destination=UNPUBLISHED, ... condition=CanCopyCondition, ... action=CopyAction, ... trigger=interfaces.MANUAL, ... order=4)
Finally let’s build the archiving transition:
>>> archive_transition = workflow.Transition( ... transition_id='archive', ... title='Archive', ... source=CLOSED, ... destination=ARCHIVED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL, ... order=5)
Now let’s build and provide the workflow utility:
>>> wf = workflow.Workflow([init_transition, ... publish_transition, close_transition, ... copy_transition, copy_closed_transition, ... archive_transition]) >>> from zope.app.testing import ztapi >>> ztapi.provideUtility(interfaces.IWorkflow, wf)
Let’s get the workflow_versions utility which we can use to track versions and come up with a new unique id:
>>> from zope.app import zapi >>> workflow_versions = zapi.getUtility(interfaces.IWorkflowVersions)
And let’s start with a document:
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
We need the document id to compare later when we create a new version:
>>> state = interfaces.IWorkflowState(document) >>> document_id = state.getId()
Let’s add it to the workflow versions container so we can find it. Note that we’re using a private API here; this could be implemented as adding it to a folder or any other way, as long as getVersions() works later:
>>> workflow_versions.addVersion(document) # private API
Also clear out previously recorded events:
>>> del events[:]
We can publish it:
>>> info.getManualTransitionIds() ['publish']
So let’s do that:
>>> info.fireTransition('publish') >>> state.getState() 'published'
The last event should be the ‘publish’ transition:
>>> events[-1].transition.transition_id 'publish'
And now we can either close or create a new copy of it. Note that the names are sorted using the order of the transitions:
>>> info.getManualTransitionIds() ['close', 'copy']
Let’s close it:
>>> info.fireTransition('close') >>> state.getState() 'closed'
We’re going to create a new copy for editing now:
>>> info.getManualTransitionIds() ['copy_closed', 'archive'] >>> document2 = info.fireTransition('copy_closed') >>> workflow_versions.addVersion(document2) # private API to track it >>> document2.title 'copy of bar' >>> state = interfaces.IWorkflowState(document2) >>> state.getState() 'unpublished' >>> state.getId() == document_id True
The original version is still there in its original state:
>>> interfaces.IWorkflowState(document).getState() 'closed'
Let’s also check the last event in some detail:
>>> event = events[-1] >>> event.transition.transition_id 'copy_closed' >>> event.old_object == document True >>> event.object == document2 True
Now we are going to publish the new version:
>>> info = interfaces.IWorkflowInfo(document2) >>> info.getManualTransitionIds() ['publish'] >>> info.fireTransition('publish') >>> interfaces.IWorkflowState(document2).getState() 'published'
The original is still closed:
>>> interfaces.IWorkflowState(document).getState() 'closed'
Now let’s publish another copy after this:
>>> document3 = info.fireTransition('copy') >>> workflow_versions.addVersion(document3) >>> interfaces.IWorkflowInfo(document3).fireTransition('publish')
This copy is now published:
>>> interfaces.IWorkflowState(document3).getState() 'published'
And the previously published version is now closed:
>>> interfaces.IWorkflowState(document2).getState() 'closed'
Note that due to the condition, it’s not possible to copy from the closed version, as there is a published version still remaining:
>>> interfaces.IWorkflowInfo(document2).getManualTransitionIds() ['archive']
Meanwhile, the original version, previously closed, is now archived:
>>> interfaces.IWorkflowState(document).getState() 'archived'
Automatic transitions
Now let’s try a workflow transition that is automatic and time-based. We’ll set up a very simple workflow between ‘unpublished’ and ‘published’, and have the ‘published’ transition be time-based.
To simulate time, we have moments:
>>> time_moment = 0
We will only publish if time_moment is greater than 3:
>>> def TimeCondition(wf, context): ... return time_moment > 3
Set up the transition using this condition; note that this one is automatic, i.e. it doesn’t have to be triggered by humans:
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=TimeCondition, ... action=NullAction, ... trigger=interfaces.AUTOMATIC)
Set up the workflow using this transition, and reusing the init transition we defined before:
>>> wf = workflow.Workflow([init_transition, publish_transition]) >>> ztapi.provideUtility(interfaces.IWorkflow, wf)
Clear out all versions; this is an private API we just use for demonstration purposes:
>>> workflow_versions.clear()
Now create a document:
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
Private again; do this with the catalog or any way you prefer in your own code:
>>> workflow_versions.addVersion(document)
Since this transition is automatic, we should see it like this:
>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds() ['publish']
Now fire let’s any automatic transitions:
>>> workflow_versions.fireAutomatic()
Nothing should have happened as we are still at time moment 0:
>>> state = interfaces.IWorkflowState(document) >>> state.getState() 'unpublished'
We change the time moment past 3:
>>> time_moment = 4
Now fire any automatic transitions again:
>>> workflow_versions.fireAutomatic()
The transition has fired, so the state will be ‘published’:
>>> state.getState() 'published'
System transitions
Let’s try system transitions now. This transition shouldn’t show up as manual nor as automatic:
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... trigger=interfaces.SYSTEM)
Set up the workflow using this transition, and reusing the init transition we defined before:
>>> wf = workflow.Workflow([init_transition, publish_transition]) >>> ztapi.provideUtility(interfaces.IWorkflow, wf)
Clear out all versions; this is an private API we just use for demonstration purposes:
>>> workflow_versions.clear()
Now create a document:
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
Private again; do this with the catalog or any way you prefer in your own code:
>>> workflow_versions.addVersion(document)
We should see it as a system transition:
>>> info.getSystemTransitionIds() ['publish']
but not as automatic nor manual:
>>> info.getAutomaticTransitionIds() [] >>> info.getManualTransitionIds() []
This transition can be fired:
>>> info.fireTransition('publish') >>> interfaces.IWorkflowState(document).getState() 'published'
Multiple transitions
It’s possible to have multiple transitions from the source state to the target state, for instance an automatic and a manual one.
Let’s set up a workflow with two manual transitions and a single automatic transitions between two states:
>>> publish_1_transition = workflow.Transition( ... transition_id='publish 1', ... title='Publish 1', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL) >>> publish_2_transition = workflow.Transition( ... transition_id='publish 2', ... title='Publish 2', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL) >>> publish_auto_transition = workflow.Transition( ... transition_id='publish auto', ... title='Publish Auto', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=TimeCondition, ... action=NullAction, ... trigger=interfaces.AUTOMATIC)
Clear out all versions; this is an private API we just use for demonstration purposes:
>>> workflow_versions.clear()
Since we’re using the time condition again, let’s make sure time is at 0 again so that the publish_auto_transition doesn’t fire:
>>> time_moment = 0
Now set up the workflow using these transitions, plus our init_transition:
>>> wf = workflow.Workflow([init_transition, ... publish_1_transition, publish_2_transition, ... publish_auto_transition]) >>> ztapi.provideUtility(interfaces.IWorkflow, wf)
Now create a document:
>>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
We should have two manual transitions:
>>> sorted(interfaces.IWorkflowInfo(document).getManualTransitionIds()) ['publish 1', 'publish 2']
And a single automatic transition:
>>> interfaces.IWorkflowInfo(document).getAutomaticTransitionIds() ['publish auto']
Protecting transitions with permissions
Permissions can be (and should be) protected with a permission, so that not everybody can execute them.
Let’s set up a workflow with a permission that has a permission:
>>> publish_transition = workflow.Transition( ... transition_id='publish', ... title='Publish', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL, ... permission="zope.ManageContent")
Quickly set up the workflow state again for a document:
>>> workflow_versions.clear() >>> wf = workflow.Workflow([init_transition, publish_transition]) >>> ztapi.provideUtility(interfaces.IWorkflow, wf) >>> document = Document('bar') >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
Let’s set up the security context:
>>> from zope.security.interfaces import Unauthorized >>> from zope.security.management import newInteraction, endInteraction >>> class Principal: ... def __init__(self, id): ... self.id = id ... self.groups = [] >>> class Participation: ... interaction = None ... def __init__(self, principal): ... self.principal = principal >>> endInteraction() # XXX argh, apparently one is already active? >>> newInteraction(Participation(Principal('bob')))
We shouldn’t see this permission appear in our list of possible transitions, as we do not have access:
>>> info.getManualTransitionIds() []
Now let’s try firing the transition. It should fail with Unauthorized:
>>> try: ... info.fireTransition('publish') ... except Unauthorized: ... print "Got unauthorized" Got unauthorized
The system user is however allowed to do it:
>>> from zope.security.management import system_user >>> endInteraction() >>> newInteraction(Participation(system_user)) >>> info.fireTransition('publish')
And this goes off without a problem.
There is also a special way to make it happen by passing check_security is False to fireTransition:
>>> endInteraction() >>> newInteraction(Participation(Principal('bob'))) >>> interfaces.IWorkflowState(document).setState(UNPUBLISHED) >>> info.fireTransition('publish', check_security=False)
Side effects during transitions
Sometimes we would like something to get executed before the WorkflowTransitionEvent is fired, but after a (potential) new version of the object has been created. If an object is edited during the same request as a workflow transition, the editing should take place after a potential new version has been created, otherwise the old, not the new, version will be edited.
If something like a history logger hooks into IWorkflowTransitionEvent however, it would get information about the new copy before the editing took place. To allow an editing to take place between the creation of the new copy and the firing of the event, a side effect function can be passed along when a transition is fired.
The sequence of execution then is:
firing of transition itself, creating a new version
executing the side effect function on the new version
firing the IWorkflowTransitionEvent
Let’s set up a very simple workflow:
>>> foo_transition = workflow.Transition( ... transition_id='foo', ... title='Foo', ... source=UNPUBLISHED, ... destination=PUBLISHED, ... condition=NullCondition, ... action=CopyAction, ... trigger=interfaces.MANUAL)
Quickly set up the workflow state again for a document:
>>> workflow_versions.clear() >>> wf = workflow.Workflow([init_transition, foo_transition]) >>> ztapi.provideUtility(interfaces.IWorkflow, wf) >>> document = Document('bar') >>> events = [] >>> info = interfaces.IWorkflowInfo(document) >>> info.fireTransition('init')
Now let’s set up a side effect:
>>> def side_effect(context): ... context.title = context.title + '!'
Now fire the transition, with a side effect:
>>> new_version = info.fireTransition('foo', side_effect=side_effect)
The title of the new version should now have a ! at the end:
>>> new_version.title[-1] == '!' True
But the old version doesn’t:
>>> document.title[-1] == '!' False
The events list we set up before should contain two events:
>>> len(events) 2 >>> events[1].object.title[-1] == '!' True
Download
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for hurry.workflow-0.9.2.1-py2.4.egg
Algorithm | Hash digest | |
---|---|---|
SHA256 | f6ee74cd314cb4c20b1d89f7d5bee63702e9ddc32427c552ad2ee0593797e0f1 |
|
MD5 | 3456357cbebd8ded690c13a80f233086 |
|
BLAKE2b-256 | 4f9baf664e9737e056198cc99c6aa7903d35247761924f3be54546a66b52b0ea |