-
Notifications
You must be signed in to change notification settings - Fork 0
ROS SMACH py notes
🔗 complete tutorial on the Official ROS wiki
🔗 If you hate Python, there's a non-official release of SMACH working on C++, ROS SMACC; here's the Getting Started, and here the GitHub SMACC repository
Just download from GitHub into your preferred ROS workspace, then compile.
git clone https://github.com/rhaschke/executive_smach_tutorials.git
SMACH consente di creare lo scheletro delle macchine a stato in maniera formale, ma non solo. Questo framework funziona solo con Python: nessun codice C++ (purtroppo...).
Il codice, almeno quello di base, dovrebbe essere strutturato in questa maniera:
- dichiarazione degli stati della macchina e dei loro cicli di esecuzione
- istanziazione della state machine
- avvio della macchina
# l'oggetto smach viene usato per registrare gli stati
import smach
import smach_ros
Ogni stato è implementato come classe connessa per ereditarietà a smach.State
. Ogni classe ha un costruttore che istanzia lo stato, e una funzione execute
che determina lo stato successivo in base ad altre informazioni esterne.
class Foo(smach.State):
def __init__(self, outcomes=['outcome1', 'outcome2']):
# Your state initialization goes here
def execute(self, userdata):
# Your state execution goes here
Outcomes. Ogni stato ha un certo numero di archi uscenti detti outcomes. Ogni uscita è etichettata usando una stringa. La execute()
, quando chiamata, deve ritornare una tra queste outcomes.
Puoi vedere ogni nodo come un programma indipendente. execute()
è il suo ciclo di funzionamento. Apena si realizza una certa condizione, la funzione ritorna un risultato. Il fatto che poi quel risultato venga utilizzato per far partire un altro stato è del tutto indifferente: è qualcosa che viene realizzato esternamente allo stato.
Anzitutto, bisogna istanziare un oggetto di tipo smach.StateMachine
che sarà la base della macchina.
sm = smach.StateMachine( outcomes=[ ... ] )
outcomes
contiene le etichette degli stati finali. Nota che questa classe è solo un handler per far partire la macchina e per ottenere informazioni. La registrazione viene fatta direttamente la libreria!
Per registrare gli stati e le transizioni tra stati, si usa il metodo .add
:
💡 Il primo stato che viene registrato è quello iniziale!
# sintassi:
# .add( 'state_label', state_class(), transitions={...}
# transitions={ ..., 'outcome_label':'next_state_label' }
'''
ES: myBestState: out1(->out1state) , out2(->out2state)
'''
with sm:
smach.StateMachine.add( 'bestState', myBestState(),
transitions={'out1':'oit1state', 'out2':'out2state'} )
e così via. Nota bene: per inserire gli stati si usa non l'handler direttamente, ma smach.StateMachine
. Chi dice alla classe smach che sto lavorando proprio con quell'handler? Glielo dice lo statement with
: è per questo che bisogna usare proprio questa struttura per poter aggiungere stati alla macchina.
Per avviare la macchina a stati,
outcome = sm.execute()
# rospy.spin()
Nota bene: il metodo execute
ritorna una outcome. Questo perchè non tutte le macchine eseguono un compito all'infinito. Molte macchine a stati rappresentano modi di eseguire una computazione in generale finita. Per questo il metodo ritorna una outcome.
Ovviamente se la macchina contenesse dei cicli o dovesse funzionare in eterno, il metodo non ritornerebbe mai. Qualunque comportamento tu voglia implementare, la forma di questa funzione va bene.
🔗 a tutorial page about remapping in SMACH from the ROS1 Wiki
In quanto programma indipendente, uno stato può richiedere esplicitamente delle informazioni in ingresso. Ecco come:
- lo stato richiede da costruttore le sue
input_keys
eoutput_keys
come liste - lo stato può usare queste chiavi come variabili nella
execute()
usando il parametrouserdata
come oggetto.
Vedi ad esempio qui di seguito, uno stato che richiede una chiave in ingresso e una in uscita.
class Foo(smach.State):
def __init__(self, outcomes=['outcome1', 'outcome2'],
input_keys=['foo_input'],
output_keys=['foo_output'])
def execute(self, userdata):
# Do something with userdata
if userdata.foo_input == 1:
return 'outcome1'
else:
userdata.foo_output = 3
return 'outcome2'
Lato main, ecco come eseguire il remapping di queste variabili:
sm_top = smach.StateMachine(outcomes=['outcome4','outcome5'],
input_keys=['sm_input'],
output_keys=['sm_output'])
with sm_top:
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'},
remapping={'foo_input':'sm_input',
'foo_output':'sm_data'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome2':'FOO'},
remapping={'bar_input':'sm_data',
'bar_output1':'sm_output'})
Lo schema realizzato da questo "semplice" codice è il seguente:
Un altro esempio di remapping lato main che non usa parametri ausiliari a costruttore ma direttamente l'istanziazione di un nuovo campo nell'handler:
def main():
rospy.init_node('smach_example_state_machine')
# Create a SMACH state machine
sm = smach.StateMachine(outcomes=['outcome4'])
sm.userdata.sm_counter = 0
# Open the container
with sm:
# Add states to the container
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'},
remapping={'foo_counter_in':'sm_counter',
'foo_counter_out':'sm_counter'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome1':'FOO'},
remapping={'bar_counter_in':'sm_counter'})
🔗 See an explaination here
Come fare? Nulla di più semplice. Considera che ogni macchina a stati ha il suo handler, e che l'handler non ha alcuna differenza da una classe blocco se non per il fatto che rappresennta una macchina a stati piuttosto che un nodo.
Che significa tutto questo? Che è possibile astrarre una macchina a stati pensandola come uno stato della macchina.
considera l'esempio:
SUB è la macchina a stati al livello inferiore. Se la si espande, si ottiene
A livello di codice, nulla di più semplice:
sm_top = smach.StateMachine( outcomes=[ 'outcome5' ] )
with sm_top:
# gli stati
smach.StateMachine.add( 'BAS', bas(),
transitions={ 'outcome3':'SUB' } )
#e la sotto macchina a stati
sm_SUB = smach.StateMachine( outcomes=[ 'outcome4' ] )
with sm_SUB:
#definizione di SUB
smach.StateMachine.add( 'FOO', foo(),
transitions={ ... } )
smach.StateMachine.add( 'BAR', ... )
# la macchina a stati SUB è solo un altro stato per sm_top
smach.StateMachine.add( 'SUB', sm_SUB,
transitions={ 'outcome4':'outcome5' }
#!/usr/bin/env python
import rospy
import smach, smach_ros
class init_state( smach.State ):
def __init__( self ):
smach.State.__init__( self, outcomes=['read_a', 'read_others'], input_keys=['idx', 'seq'], output_keys=['new_idx'] )
def execute( self, data ):
if data.seq[0] == 'a':
data.new_idx = 1
return 'read_a'
else:
return 'read_others'
class read_state_1( smach.State ):
def __init__( self ):
smach.State.__init__( self, outcomes=['read_s', 'read_q', 'read_a', 'read_others'], input_keys=['idx', 'seq'], output_keys=['new_idx'] )
def execute( self, data ):
if data.seq[data.idx] == 's':
data.new_idx = data.idx + 1
return 'read_s'
elif data.seq[data.idx] == 'q':
data.new_idx = data.idx + 1
return 'read_q'
elif data.seq[data.idx] == 'a':
data.new_idx = data.idx + 1
return 'read_a'
else:
return 'read_others'
class read_state_2( smach.State ):
def __init__( self ):
smach.State.__init__( self, outcomes=['read_p', 'read_others'], input_keys=['idx', 'seq'], output_keys=['new_idx'] )
def execute( self, data ):
if data.seq[data.idx] == 'p':
data.new_idx = data.idx + 1
return 'read_p'
else:
data.new_idx = data.idx + 1
return 'read_others'
if __name__ == "__main__":
rospy.init_node( "test_state_machine_1" )
sm = smach.StateMachine( outcomes = ['accepted', 'discarded'] )
sm.userdata.index = 0;
sm.userdata.sequence = ""
sm_remapping = {'idx':'index', 'seq':'sequence', 'new_idx':'idx'}
with sm:
smach.StateMachine.add( 'init_state', init_state(), transitions={'read_a':'read_state_1', 'read_others':'discarded'}, remapping=sm_remapping )
smach.StateMachine.add( 'read_state_1', read_state_1(), transitions={'read_s':'read_state_1', 'read_q':'read_state_2', 'read_a':'accepted', 'read_others':'discarded'}, remapping=sm_remapping )
smach.StateMachine.add( 'read_state_2', read_state_2(), transitions={ 'read_p':'read_state_1', 'read_others':'read_state_2' }, remapping=sm_remapping )
# rosrun smach_viewer smach_viewer.py
sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')
sis.start()
sm.userdata.sequence = "asssssqiusfhnvsiufnhcisuhcniefscpsssssssa"
rospy.loginfo( "word: %s > outcome: %s", sm.userdata.sequence, sm.execute( ) )
rospy.spin()
sis.stop()
🔗 see this project on GitHub