.. currentmodule:: capillary
Status: | Alpha (API feedback is welcome, API could break compatibility) |
---|---|
Generated: | |today| |
Version: | |release| |
License: | BSD 3 Clause |
Authors: | Domen Kožar and Aaron McMillin |
Introduction
:mod:`capillary` is a small integration package for the :mod:`celery` Distributed Task Queue with an aim of designing workflows (Canvas) in a declarative manner using Python decorators.
The main reason why :mod:`capillary` exists is to get rid of manual tracking how celery tasks depend on each other.
:mod:`capillary` executes in two phases:
- Scans for all Celery tasks in defined Python packages
- Executes tasks based on metadata passed to :func:`@pipeline` decorators
:mod:`capillary` uses:
- :mod:`venusian` to discover Celery tasks using deferred decorators
- :mod:`networkx` to handle the graph operations for tracking dependencies between tasks in the workflow
Below is the first steps with Celery tutorial expanded with :mod:`capillary` integration.
The tasks.py
module contains the steps of the pipeline. Steps are
marked with the @pipeline
decorator, which has optional parameters
to indicate that this step must be executed after some other step or
that the step has certain tags to allow groups of tasks to be executed together.
from capillary import pipeline
@pipeline()
def foo(celery_task):
return 'simple task flow'
@pipeline(after='foo')
def bar(celery_task, l):
return l.upper()
The myapp.py
module then creates a :class:`PipelineConfigurator`
instance which will assemble the declared steps:
from celery import Celery
from capillary import PipelineConfigurator
import tasks
app = Celery('tasks', broker='redis://', backend='redis://')
pc = PipelineConfigurator(app)
pc.scan(tasks)
Start the worker with
$ celery worker -A myapp -D
and execute the pipeline in a Python shell:
>>> from myapp import pc
>>> asyncresult = pc.run()
>>> asyncresult.get()
SIMPLE TASK FLOW
This example will be used throughout the user guide as a base.
Note
This example assumes the Redis broker is running with default settings, but any Celery broker will do.
backend
is defined only for retrieving the result using .get(), it is
otherwise not required.
Celery uses a concept called partials (sometimes also known as Currying) to create function signatures.
:mod:`capillary` reuses these concepts to execute tasks. A value returned
from task foo
is passed into task bar
.
It is possible to pass extra parameters to specific tasks as described in :ref:`extra-parameters`.
By default :meth:`PipelineConfigurator.run` will execute all scanned tasks without tags in topological order.
If tags=['foobar']
is passed to :func:`@pipeline`, the task will be run
when `tagged_as=['foobar']
is passed to :meth:`PipelineConfigurator.run`.
See :ref:`predefined_defaults` for information on how to reduce boilerplate and group pipelines per tag.
If a step needs to stop the current pipeline (meaning no further tasks are processed in the pipeline), just raise :exc:`capillary.AbortPipeline` anywhere in your pipeline tasks.
Some :func:`@pipeline` elements might require extra arguments that are only known when :meth:`PipelineConfigurator.run` is called.
>>> @pipeline(
... required_kwarg_names=['param'],
... )
... def foobar(celery_task, param=None):
... print param
... return 'simple task flow'
When :meth:`PipelineConfigurator.run` is called, it will need param passed inside required_kwargs; otherwise :exc:`MissingArgument` will be thrown.
Applying multiple :func:`@pipeline` decorators
The most typical use case where two :func:`@pipeline` decorators are useful is when you'd like to reuse a function for two different pipelines each differently tagged.
@pipeline(
after=['first', 'second'],
tags=['some_pipeline'],
)
@pipeline(
after=['third'],
tags=['other_pipeline'],
)
def foobar(celery_task):
return 'simple task flow'
Executing ConfigurePipeline.run(tagged_as=['some_pipeline'])
would run the foobar function as a task after first and second tasks were done.
However executing ConfigurePipeline.run(tagged_as=['other_pipeline'])
would run the foobar function after third task was done.
Note
If both tags are used (e.g. ConfigurePipeline.run(tagged_as=['some_pipeline', 'other_pipeline'])
)
then ordering of tags specified matters and the latter will override a former.
if you specify a different name parameter for each, they will be both executed.
Often :func:`@pipeline` definitions will repeat arguments through your
application. :func:`make_pipeline_from_defaults` allows you to create customized
predefined defaults for a pipeline. This example makes a foobar_pipeline
decorator that will apply the same tag to each step:
>>> from capillary import make_pipeline_from_defaults
>>> foobar_pipeline = make_pipeline_from_defaults(
>>> tags=["foobar"]
>>> )
Then use @foobar_pipeline
just as one would use :func:`@pipeline` while all your
definitions will have foobar as a tag.
Note
Passing tags
to @foobar_pipeline
will override ["foobar"]
value.
To actually see what kind of canvas will be executed call :meth:`ConfigurePipeline.prettyprint` with the same arguments as :meth:`ConfigurePipeline.run`
>>> pc.prettyprint(args=[], kwargs={})
tasks.foo() | tasks.bar()
Using a constant :class:`capillary.ALL` it's possible to declare a task as the last one in the pipeline
>>> from capillary import ALL, pipeline
>>> @pipeline(
... after=ALL,
... )
... def last(celery_task, obj):
... print('ALL DONE!')
... return obj
Note
Multiple tasks with after=ALL steps will be run in :class:`celery.group` as the last part of the pipeline.
Inner workings of :meth:`~PipelineConfigurator.run()`
The following is a quick summary of what happens inside :meth:`~PipelineConfigurator.run()`:
- task tree is generated using dependency information
- Celery signatures are created
- task tree is reduced into a chain using topological sort
- tasks is executed using :meth:`celery.app.Task.apply_async`
Note
Currently the task tree is reduced into a linear chained list of tasks, but in future different "runners" could be implemented.
Functions marked as :func:`@pipeline` elements are still just simple untouched functions,
until :meth:`PipelineConfigurator.scan()` is called. If function code doesn't
depend on the first argument of celery_task
, just pass None as the value.
To unit test our two pipeline elements from :ref:`simple-example`:
class PipelineTestCase(unittest.TestCase):
def test_bar(self):
self.assertEquals(bar(None, 'test'), 'TEST')
def test_foo(self):
self.assertEquals(foo(None), 'simple task flow')
To run tests install py.test and run it:
$ py.test tests/
- Using a lot of tasks with large objects passed as arguments can be quite storage intensive. One alternative would be to generate signatures on-the-fly if Celery permits that.
.. automodule:: capillary
:members:
:exclude-members: PipelineConfigurator
.. autoclass:: capillary.PipelineConfigurator
:members: run, prettyprint, scan