Skip to main content

State Machine Workflow Engine

Project description

The ore workflow system is based on hurry.workflow which was written by Martijn Faassen, its originally a “roll my own because I’m in a hurry” framework. ORE Workflow offers a number of improvements to make it useful for real world applications. Chief among these is the ability to have more than one workflow per site, via adaptation based workflows, instead of the original utilitly workflows. Additionally it supports parallel workflows on a single context. This version was forked after several months of waiting for upstream to take patches w/ tests.

This simplifies greatly the machinery and dependencies, to make a simple to use state machine workflow for python with minimal dependencies.

Basic workflow

Let’s first make a content object that can go into a workflow:

>>> from zope.interface import implements, Attribute, Interface

>>> class IDocument(Interface):
...    title = Attribute('Title')
>>> class Document(object):
...    implements(IDocument)
...    def __init__(self, title):
...        self.title = title

As you can see, such a content object must provide IAnnotatable, as this is used to store the workflow state. The system uses the IWorkflowState adapter to get and set an object’s workflow state:

>>> from ore.workflowed import interfaces
>>> document = Document('Foo')
>>> state = interfaces.IWorkflowState(document)
>>> print state.getState()
None

The state can be set directly for an object using the IWorkflowState adapter as well:

>>> state.setState('foo')
>>> state.getState()
'foo'

But let’s set it back to None again, so we can start again in a pristine state for this document:

>>> state.setState(None)

It’s not recommended use setState() do this ourselves, though: usually we’ll let the workflow system take care of state transitions and the setting of the initial state.

Now let’s define a simple workflow transition from ‘a’ to ‘b’. It needs a condition which must return True before the transition is allowed to occur:

>>> def NullCondition(wf, context):
...    return True

and an action that takes place when the transition is taken:

>>> def NullAction(wf, context):
...    pass

Now let’s construct a transition:

>>> from ore.workflowed import workflow
>>> transition = workflow.Transition(
...     transition_id='a_to_b',
...     title='A to B',
...     source='a',
...     destination='b',
...     condition=NullCondition,
...     action=NullAction,
...     trigger=interfaces.MANUAL)

The transition trigger is either MANUAL, AUTOMATIC or SYSTEM. MANUAL indicates user action is needed to fire the transition. AUTOMATIC transitions fire automatically. SYSTEM is a workflow transition directly fired by the system, and not directly by the user.

We also will introduce an initial transition, that moves an object into the workflow (for instance just after it is created):

>>> init_transition = workflow.Transition(
...     transition_id='to_a',
...     title='Create A',
...     source=None,
...     destination='a')

And a final transition, when the object moves out of the workflow again (for instance just before it is deleted):

>>> final_transition = workflow.Transition(
...     transition_id='finalize',
...     title='Delete',
...     source='b',
...     destination=None)

Now let’s put the transitions in an workflow utility:

>>> wf = workflow.Workflow([transition, init_transition, final_transition])
>>> from zope.component import getGlobalSiteManager
>>> globalSiteManager = getGlobalSiteManager()
>>> globalSiteManager.registerAdapter( wf, (IDocument,), interfaces.IWorkflow)

Workflow transitions cause events to be fired; we will put in a simple handler so we can check whether things were successfully fired:

>>> events = []
>>> def transition_handler(event):
...     events.append(event)
>>> globalSiteManager.registerHandler(transition_handler, [interfaces.IWorkflowTransitionEvent], None)

To get what transitions to other states are possible from an object, as well as to fire transitions and set initial state, we use the IWorkflowInfo adapter:

>>> info = interfaces.IWorkflowInfo(document)

We’ll initialize the workflow by firing the ‘to_a’ transition:

>>> info.fireTransition('to_a')

This should’ve fired an event:

>>> events[-1].transition.transition_id
'to_a'
>>> events[-1].source is None
True
>>> events[-1].destination
'a'

There’s only a single transition defined to workflow state ‘b’:

>>> info.getManualTransitionIds()
['a_to_b']

We can also get this by asking which manual (or system) transition exists that brings us to the desired workflow state:

>>> info.getFireableTransitionIdsToward('b')
['a_to_b']

Since this is a manually triggered transition, we can fire this transition:

>>> info.fireTransition('a_to_b')

The workflow state should now be ‘b’:

>>> state.getState()
'b'

We check that the event indeed got fired:

>>> events[-1].transition.transition_id
'a_to_b'
>>> events[-1].source
'a'
>>> events[-1].destination
'b'

We will also try fireTransitionToward here, so we sneak back the workflow to state ‘a’ again and try that:

>>> state.setState('a')

Try going through a transition we cannot reach first:

>>> info.fireTransitionToward('c')
Traceback (most recent call last):
...
NoTransitionAvailableError

Now go to ‘b’ again:

>>> info.fireTransitionToward('b')
>>> state.getState()
'b'

Finally, before forgetting about our document, we finalize the workflow:

>>> info.fireTransition('finalize')
>>> state.getState() is None
True

And we have another event that was fired:

>>> events[-1].transition.transition_id
'finalize'
>>> events[-1].source
'b'
>>> events[-1].destination is None
True

Adapation based workflow

we can also define adaptation based workflows with workflows registered as adapters.

>>> class IInvoiceDocument(IDocument):
...    title = Attribute('Title')
>>> class InvoiceDocument(object):
...    implements(IInvoiceDocument)
...    def __init__(self, title, amount):
...        self.title = title
...        self.amount = amount

we define a class where our workflow will be applied to and then define our workflow.

>>> invoice_init = workflow.Transition(
...     transition_id='init_invoice',
...     title='Invoice Received',
...     source=None,
...     destination='received')
>>>
>>> invoice_paid = workflow.Transition(
...     transition_id='invoice_paid',
...     title='Invoice Paid',
...     source='received',
...     destination='paid')
>>> invoice_wf = workflow.Workflow( [ invoice_init, invoice_paid ] )

we convert our utility workflow to an adapter and register it.

>>> adapted_workflow = workflow.AdaptedWorkflow( invoice_wf )
>>> globalSiteManager.registerAdapter( adapted_workflow, (IInvoiceDocument,), interfaces.IWorkflow )

now we can utilize the workflow.

>>> invoice = InvoiceDocument('abc', 22)
>>> isinstance( interfaces.IWorkflow( invoice ), workflow.AdaptedWorkflowBase )
True
>>> interfaces.IWorkflowInfo( invoice ).fireTransition('init_invoice')
>>> interfaces.IWorkflowState( invoice ).getState()
'received'
>>> interfaces.IWorkflowInfo( invoice ).fireTransition('invoice_paid')
>>> interfaces.IWorkflowState( invoice ).getState()
'paid'

Parallel Workflows

Another interesting feature is the ability to have multiple workflows active on a single object. Each of the workflows is setup as a set of named adapters for IWorkflow, IWorkflowInfo, and IWorkflowState respectively. It does mean however that we need to qualify getting the workflow adapters with the workflow name.

Let’s utlize our two workflows already defined as an example. As a convience the ParallelWorkflow constructor will register the three workflow components as appropriate named adapters if passed in an Interface that their applicable for.

>>> class IWorkDocument(IDocument):
...    title = Attribute('Title')
>>> class WorkDocument( Document ):
...    implements( IWorkDocument )
>>> invoice_adapters = workflow.ParallelWorkflow( invoice_wf, "invoice", IWorkDocument )
>>> doc_adapters = workflow.ParallelWorkflow( wf, "doc", IWorkDocument )
>>> wdoc = WorkDocument('text')
>>> from zope.component import getAdapter
>>> getAdapter( wdoc, interfaces.IWorkflowInfo, "doc").fireTransition('to_a')
>>> getAdapter( wdoc, interfaces.IWorkflowState, "doc").getState()
'a'
>>> getAdapter( wdoc, interfaces.IWorkflowInfo, "invoice").fireTransition('init_invoice')
>>> getAdapter( wdoc, interfaces.IWorkflowState, "invoice").getState()
'received'
>>> getAdapter( wdoc, interfaces.IWorkflowState, "doc").getState()
'a'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ore.workflowed-0-6-0.tar.gz (11.0 kB view details)

Uploaded Source

File details

Details for the file ore.workflowed-0-6-0.tar.gz.

File metadata

File hashes

Hashes for ore.workflowed-0-6-0.tar.gz
Algorithm Hash digest
SHA256 bdbdf406e756d9b610b424f9533ed0c5a80096fbabdc262f989526704a7fd885
MD5 20b332c008cb358c85342953575c02d6
BLAKE2b-256 836897b90d3e9ea1a499fbbaffa15a8eccf907272b5be561804f702940ed3389

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page