State Machine Workflow Engine
Project description
The ore workflow system is based on hurry.workflow which was written by Martijn Faassen, its originally a “roll my own because I’m in a hurry” framework. ORE Workflow offers a number of improvements to make it useful for real world applications. Chief among these is the ability to have more than one workflow per site, via adaptation based workflows, instead of the original utilitly workflows. Additionally it supports parallel workflows on a single context. This version was forked after several months of waiting for upstream to take patches w/ tests.
This simplifies greatly the machinery and dependencies, to make a simple to use state machine workflow for python with minimal dependencies.
Basic workflow
Let’s first make a content object that can go into a workflow:
>>> from zope.interface import implements, Attribute, Interface >>> class IDocument(Interface): ... title = Attribute('Title') >>> class Document(object): ... implements(IDocument) ... def __init__(self, title): ... self.title = title
As you can see, such a content object must provide IAnnotatable, as this is used to store the workflow state. The system uses the IWorkflowState adapter to get and set an object’s workflow state:
>>> from ore.workflowed import interfaces >>> document = Document('Foo') >>> state = interfaces.IWorkflowState(document) >>> print state.getState() None
The state can be set directly for an object using the IWorkflowState adapter as well:
>>> state.setState('foo') >>> state.getState() 'foo'
But let’s set it back to None again, so we can start again in a pristine state for this document:
>>> state.setState(None)
It’s not recommended use setState() do this ourselves, though: usually we’ll let the workflow system take care of state transitions and the setting of the initial state.
Now let’s define a simple workflow transition from ‘a’ to ‘b’. It needs a condition which must return True before the transition is allowed to occur:
>>> def NullCondition(wf, context): ... return True
and an action that takes place when the transition is taken:
>>> def NullAction(wf, context): ... pass
Now let’s construct a transition:
>>> from ore.workflowed import workflow >>> transition = workflow.Transition( ... transition_id='a_to_b', ... title='A to B', ... source='a', ... destination='b', ... condition=NullCondition, ... action=NullAction, ... trigger=interfaces.MANUAL)
The transition trigger is either MANUAL, AUTOMATIC or SYSTEM. MANUAL indicates user action is needed to fire the transition. AUTOMATIC transitions fire automatically. SYSTEM is a workflow transition directly fired by the system, and not directly by the user.
We also will introduce an initial transition, that moves an object into the workflow (for instance just after it is created):
>>> init_transition = workflow.Transition( ... transition_id='to_a', ... title='Create A', ... source=None, ... destination='a')
And a final transition, when the object moves out of the workflow again (for instance just before it is deleted):
>>> final_transition = workflow.Transition( ... transition_id='finalize', ... title='Delete', ... source='b', ... destination=None)
Now let’s put the transitions in an workflow utility:
>>> wf = workflow.Workflow([transition, init_transition, final_transition]) >>> from zope.component import getGlobalSiteManager >>> globalSiteManager = getGlobalSiteManager() >>> globalSiteManager.registerAdapter( wf, (IDocument,), interfaces.IWorkflow)
Workflow transitions cause events to be fired; we will put in a simple handler so we can check whether things were successfully fired:
>>> events = [] >>> def transition_handler(event): ... events.append(event) >>> globalSiteManager.registerHandler(transition_handler, [interfaces.IWorkflowTransitionEvent], None)
To get what transitions to other states are possible from an object, as well as to fire transitions and set initial state, we use the IWorkflowInfo adapter:
>>> info = interfaces.IWorkflowInfo(document)
We’ll initialize the workflow by firing the ‘to_a’ transition:
>>> info.fireTransition('to_a')
This should’ve fired an event:
>>> events[-1].transition.transition_id 'to_a' >>> events[-1].source is None True >>> events[-1].destination 'a'
There’s only a single transition defined to workflow state ‘b’:
>>> info.getManualTransitionIds() ['a_to_b']
We can also get this by asking which manual (or system) transition exists that brings us to the desired workflow state:
>>> info.getFireableTransitionIdsToward('b') ['a_to_b']
Since this is a manually triggered transition, we can fire this transition:
>>> info.fireTransition('a_to_b')
The workflow state should now be ‘b’:
>>> state.getState() 'b'
We check that the event indeed got fired:
>>> events[-1].transition.transition_id 'a_to_b' >>> events[-1].source 'a' >>> events[-1].destination 'b'
We will also try fireTransitionToward here, so we sneak back the workflow to state ‘a’ again and try that:
>>> state.setState('a')
Try going through a transition we cannot reach first:
>>> info.fireTransitionToward('c') Traceback (most recent call last): ... NoTransitionAvailableError
Now go to ‘b’ again:
>>> info.fireTransitionToward('b') >>> state.getState() 'b'
Finally, before forgetting about our document, we finalize the workflow:
>>> info.fireTransition('finalize') >>> state.getState() is None True
And we have another event that was fired:
>>> events[-1].transition.transition_id 'finalize' >>> events[-1].source 'b' >>> events[-1].destination is None True
Adapation based workflow
we can also define adaptation based workflows with workflows registered as adapters.
>>> class IInvoiceDocument(IDocument): ... title = Attribute('Title')>>> class InvoiceDocument(object): ... implements(IInvoiceDocument) ... def __init__(self, title, amount): ... self.title = title ... self.amount = amount
we define a class where our workflow will be applied to and then define our workflow.
>>> invoice_init = workflow.Transition( ... transition_id='init_invoice', ... title='Invoice Received', ... source=None, ... destination='received') >>> >>> invoice_paid = workflow.Transition( ... transition_id='invoice_paid', ... title='Invoice Paid', ... source='received', ... destination='paid')>>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )
we convert our utility workflow to an adapter and register it.
>>> adapted_workflow = workflow.AdaptedWorkflow( invoice_wf ) >>> globalSiteManager.registerAdapter( adapted_workflow, (IInvoiceDocument,), interfaces.IWorkflow )
now we can utilize the workflow.
>>> invoice = InvoiceDocument('abc', 22) >>> isinstance( interfaces.IWorkflow( invoice ), workflow.AdaptedWorkflowBase ) True>>> interfaces.IWorkflowInfo( invoice ).fireTransition('init_invoice') >>> interfaces.IWorkflowState( invoice ).getState() 'received' >>> interfaces.IWorkflowInfo( invoice ).fireTransition('invoice_paid') >>> interfaces.IWorkflowState( invoice ).getState() 'paid'
Parallel Workflows
Another interesting feature is the ability to have multiple workflows active on a single object. Each of the workflows is setup as a set of named adapters for IWorkflow, IWorkflowInfo, and IWorkflowState respectively. It does mean however that we need to qualify getting the workflow adapters with the workflow name.
Let’s utlize our two workflows already defined as an example. As a convience the ParallelWorkflow constructor will register the three workflow components as appropriate named adapters if passed in an Interface that their applicable for.
>>> class IWorkDocument(IDocument): ... title = Attribute('Title') >>> class WorkDocument( Document ): ... implements( IWorkDocument )>>> invoice_adapters = workflow.ParallelWorkflow( invoice_wf, "invoice", IWorkDocument ) >>> doc_adapters = workflow.ParallelWorkflow( wf, "doc", IWorkDocument )>>> wdoc = WorkDocument('text') >>> from zope.component import getAdapter >>> getAdapter( wdoc, interfaces.IWorkflowInfo, "doc").fireTransition('to_a') >>> getAdapter( wdoc, interfaces.IWorkflowState, "doc").getState() 'a' >>> getAdapter( wdoc, interfaces.IWorkflowInfo, "invoice").fireTransition('init_invoice') >>> getAdapter( wdoc, interfaces.IWorkflowState, "invoice").getState() 'received' >>> getAdapter( wdoc, interfaces.IWorkflowState, "doc").getState() 'a'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file ore.workflowed-0-6-0.tar.gz
.
File metadata
- Download URL: ore.workflowed-0-6-0.tar.gz
- Upload date:
- Size: 11.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bdbdf406e756d9b610b424f9533ed0c5a80096fbabdc262f989526704a7fd885 |
|
MD5 | 20b332c008cb358c85342953575c02d6 |
|
BLAKE2b-256 | 836897b90d3e9ea1a499fbbaffa15a8eccf907272b5be561804f702940ed3389 |