-
Notifications
You must be signed in to change notification settings - Fork 92
Actors
New actors are developed by inheriting from the Actor
class.
The following is an example1 of an actor. What it does is quite straightforward. It takes two parameters, tick
and data
, and when it is executing, it produces the given data on its outport every tick seconds (note that tick is not necessarily an integer.)
from calvin.actor.actor import Actor, manage, condition, stateguard
class Trigger(Actor):
"""
Pass on given _data_ every _tick_ seconds
Outputs:
data: given data
"""
@manage(['tick', 'data', 'started'])
def init(self, tick, data):
self.tick = tick
self.data = data
self.timer = None
self.started = False
self.setup()
def setup(self):
self.use('calvinsys.events.timer', shorthand='timer')
def start(self):
self.timer = self['timer'].repeat(self.tick)
self.started = True
def will_migrate(self):
if self.timer:
self.timer.cancel()
def did_migrate(self):
self.setup()
if self.started:
self.start()
@stateguard(lambda self: not self.started)
@condition([], ['data'])
def start_timer(self):
self.start()
return (self.data, )
@stateguard(lambda self: self.timer and self.timer.triggered)
@condition([], ['data'])
def trigger(self):
self.timer.ack()
return (self.data, )
action_priority = (start_timer, trigger)
requires = ['calvinsys.events.timer']
The first lines import the necessary parts of the Actor module.
from calvin.actor.actor import Actor, manage, condition, stateguard
Here, Actor
is the base class of all actors and should always be included, while manage
, condition
and stateguard
are python-decorators to simplify development. Note that as of Calvin version 0.7, stateguard
replaces guard
and is used somewhat differently (see below.)
Every actor will use @manage
and @condition
, while @stateguard
is optional.
The @manage
decorator is used to inform the runtime which attributes of the
actor should be automatically managed when migrating, and the @condition
decorator specifies which ports are expected to have tokens available (for in-ports) and space available for tokens (for out-ports) in order for an action to fire. The condition
decorator also
serves the dual purpose of specifying that the decorated function actually is an action.
Finally, the @stateguard
decorator, which must appear before the @condition
decorator,
decides whether or not to allow the action to run based in the internal state of the actor (as the name implies.)
@manage
This decorator should be used with the init
method of the actor to inform the system of which actor attributes (i.e. actor state) should be serialized on migration.
Usage:
@manage() # Manage every instance variable known upon completion of init
@manage(include = []) # Manage nothing
@manage(include = [foo, bar]) # Manage self.foo and self.bar only. Equivalent to @manage([foo, bar])
@manage(exclude = [foo, bar]) # Manage everything except self.foo and self.bar
@manage(exclude = []) # Same as @manage()
@manage(<list>) # Same as @manage(include = <list>)
N.B. If include and exclude are both present, exclude will be disregarded.
@stateguard
The stateguard decorator refines the criteria for picking an action. It takes one argument, the actor itself, and should return a boolean value based on evaluating the state of the actor with True meaning the action allowed.
e.g.
@stateguard(lambda self: not self.started)
in the above actor.
@condition
The @condition
decorator specifies the required input data and necessary output space. Unless both input and output conditions are fulfilled the action cannot be fired.
Usage:
@condition(action_input=[], action_output=[])
Both action_input
and action_output
parameters are lists of port names. It is always a single token, or space for a single token for out-ports, that is checked for. This restriction was introduced in Calvin 0.7.
An actor should always inherit from the Actor
base class.
class Trigger(Actor):
The docstring of the actor defines the ports and their names. Note: This is
required as it is the only way of defining ports. In this example there are
no in-ports and one out-port, named data
, meaning this actor is a source of tokens. The Inputs:
and Outputs:
headings are optional if no ports are listed under the heading. For
aesthetical reasons the plural 's' in the headings is optional, as is the
capitalization of the words. It is also optional to add a description of the port after the port
string. At least a whitespace is required after the name, but the
convention is to separate port name and port documentation by " : ", i.e.
space colon space
. It is a convention to use lowercase letters for ports,
unless there is good reason not to, e.g. a port taking a URL as input.
"""
Pass on given _data_ every _tick_ seconds
Outputs:
data: given data
"""
The manage
decorator, here used in its simplest form - giving a list of
attributes to manage. Optionally, it could be written @manage( include = ['tick', 'data', 'started] )
. If there are a multitude of attributes, all of which should be
included, an empty argument list, i.e. @manage()
will ensure all of them are
included. It is also possible to only specify which attributes should be
excluded. See the documentation for Actor.actor.manage
for further details.
@manage(['tick', 'data', 'started'])
The init
method serves the same purpose for actors as __init__
does for
python classes, to initialize attributes. This is the first method called
after an actor is created and has access to all actor specific methods.
Any attribute which exists after init
returns can be managed (see above.) Do not add an __init__
function to your actor.
@manage(['tick', 'data', 'started'])
def init(self, tick, data):
self.tick = tick
self.data = data
self.timer = None
self.started = False
self.setup()
It is of course possible to define and call any number of methods in an actor as per a normal python class. The setup
and start
methods are simply convenience function. While it is not enforced, it is a convention to include a setup function where the requirements (see below) are initialized (i.e. by calling the actor method use
.)
def setup(self):
self.use('calvinsys.events.timer', shorthand='timer')
Once this is done, the actor has access to the timer in CalvinSys1 using self['timer']
. The start
method initializes a new timer for use by the actor and sets the state to started.
def start(self):
self.timer = self['timer'].repeat(self.tick)
self.started = True
There are two special methods in this actor: will_migrate()
and did_migrate()
. As the names imply, these are called right before the actor is migrated to another runtime, and after the migration has finished. The purpose is to let the actor first wrap things up and clean up after itself before leaving its current runtime, as well as prepare for running on the new runtime once it has arrived. The Trigger
actor simply cancels its timer before leaving (if it is running), and then sets up a new one before starting up again (if it had started before it was migrated.)1
An action is defined by the @condition
decorator. This particular actor has no in-ports and one out-port, named data
. The following states that the action decorated with this condition requires no tokens on the inports, but space left on the outport data
:
@condition([], ['data'])
This actor has two actions, start_timer
and trigger
. They have identical conditions, which means that if one of them can fire, then both can. However, which one should actually fire depends on which state the actor is in; if the timer is not running, then it should be started, but this should only be done once, of course, so after it has started, the trigger
action should be used instead. This is captured by the different stateguards. The start_timer
action has the stateguard
@stateguard(lambda self: not self.started)
whereas the trigger
action has the stateguard1
@stateguard(lambda self: self.timer and self.timer.triggered)
and they are mutually exclusive (although one would have to read the code in order to determine this.)
Finally, action_priority
determines the priority of the actions, and requires
lists the runtime properties this actor needs to execute its functionality.
The action_priority
is the
order in which conditions
(and guards
) will be evaluated to see which
actions fire. Whenever an action fires, this can cause actions of higher
priority to be ready to fire, and thus the sequence will be iterated,
restarting whenever an action fires, until no action has fired for a full
iteration.
action_priority = ( start_timer, trigger )
The requires
list of properties (actually CalvinSys modules) which must be available on the runtime for it to be able to execute the actor. It can still exist on the runtime, but it cannot execute. In this case, it is a timer:
requires = ['calvinsys.events.timer']
These requirements are used when deploying applications.
The actor test framework is not extensively documented, unfortunately. Here is a brief overview.
test_kwargs
is a dictionary of arguments used for instantiating the actor during the test. Only used when the actor has arguments.
test_calvinsys
is a dictionary with calvinsys names containing the result of doing a read operation on this object (to simulate reading data) and the data expected to written to it (to verify generated data.)
test_set
is a list of test cases. Each test case is a dictionary with data to be generated on inports and the data expected to be written to outports.
The web.TriggeredWeather
actor is a fairly complete example. The actor has one argument, the city to get the weather for, and it uses the weather
calvinsys. The write
operation of the weather
calvinsys expects the name of a location (city), and the read operation retrieves the forecast for this location.
The test setup is as follows:
test_kwargs = {'city': 'Lund'}
test_calvinsys = {'weather': {'read': ["sunny"],
'write': ["Lund", "Lund"]}}
test_set = [
{
'inports': {'trigger': [True]},
'outports': {'forecast': ["sunny"]}
}
]
The test set consists of a single tests which sends True
to the trigger
port and expects the string sunny
on the forecast
port. During this test, reading from the weather
calvinsys will return the string sunny
the first (and only) time, and it expects two writes with the string Lund
.
The test can be executed by pytest
or as a stand-alone python application:
python actorstore/tests/test_actors.py web.TriggeredWeather
The following port properties can be specified in a Calvinscript. Note that this is not commonly done.
tag
routing
queue_length
Specifies a tag on e.g. an outport that can be retrived with certain inport routing properties. The property can be applied on both in- and out-ports. Argument type is string.
<actor-instance>.<port-name>[in/out](tag=<string>)
Routing decides how tokens are routed out or in of a port. The direction of the port the property can be applied on is decided by the arguments. Argument type is category.
<actor-instance>.<port-name>[in/out](routing=<category>)
-
"round-robin"
: Route each tokens to one peer in a round-robin schedule. The argument can be applied on out-ports. -
"balanced"
: Route each tokens to one peer based on queue length. The argument can be applied on out-ports. -
"fanout"
: The default routing of all tokens to all peers with specific name. The argument can be applied on out-ports. -
"default"
: The default routing of all tokens to all peers. The argument can be applied on out-ports. -
"collect-tagged"
: Collect tokens from multiple peers, actions see them individually as {: token}. Use property tag on a connected outport otherwise tag defaults to port id. The argument can be applied on in-ports. -
"collect-unordered"
: Collect tokens from multiple peers, actions see them individually in without order between peers. The argument can be applied on in-ports. -
"random"
: Route each tokens to one peer in a random schedule. The argument can be applied on out-ports. -
"collect-any-tagged"
: Collect tokens from multiple peers, actions see them as one token {: , ... : }. The collected token is available when any of the peers have delivered tokens. Use property tag on a connected outport otherwise tag defaults to port id. The argument can be applied on in-ports. -
"collect-all-tagged"
: Collect tokens from multiple peers, actions see them all as one token {: , ... : }. Use property tag on a connected outport otherwise tag defaults to port id. The argument can be applied on in-ports.
Specifies the minimum number of tokens that the queue can hold. The property can be applied on both in- and out-ports. Argument type is scalar.
<actor-instance>.<port-name>[in/out](queue_length=<scalar>)
1: As of Calvin version 1.0 the calvinsys API has changed (for the better) and it is no longer necessary to manually handle migration by implementing will_migrate
and did_migrate
(there might other circumstances that do so the above is kept to serve as a reference) and the @stateguard
arguments are slightly different but hopefully self-explanatory. See current implementation of std.Trigger
for details.