Skip to content

ROS SMACH py notes

Francesco Ganci edited this page Jun 28, 2022 · 2 revisions

⚠️ Not yet reviewed, in Italian ⚠️

🔗 Official Site of ROS SMACH

🔗 SMACH on GitHub

🔗 complete tutorial on the Official ROS wiki

🔗 If you hate Python, there's a non-official release of SMACH working on C++, ROS SMACC; here's the Getting Started, and here the GitHub SMACC repository

Install SMACH

Just download from GitHub into your preferred ROS workspace, then compile.

git clone https://github.com/rhaschke/executive_smach_tutorials.git

ROS - SMACH - come si usa

Utilizzo di base

SMACH consente di creare lo scheletro delle macchine a stato in maniera formale, ma non solo. Questo framework funziona solo con Python: nessun codice C++ (purtroppo...).

Il codice, almeno quello di base, dovrebbe essere strutturato in questa maniera:

  1. dichiarazione degli stati della macchina e dei loro cicli di esecuzione
  2. istanziazione della state machine
  3. avvio della macchina

Importazione

# l'oggetto smach viene usato per registrare gli stati
import smach
import smach_ros

Passo 1 — implementazione degli stati

Ogni stato è implementato come classe connessa per ereditarietà a smach.State. Ogni classe ha un costruttore che istanzia lo stato, e una funzione execute che determina lo stato successivo in base ad altre informazioni esterne.

class Foo(smach.State):
     def __init__(self, outcomes=['outcome1', 'outcome2']):
       # Your state initialization goes here

     def execute(self, userdata):
        # Your state execution goes here

Outcomes. Ogni stato ha un certo numero di archi uscenti detti outcomes. Ogni uscita è etichettata usando una stringa. La execute(), quando chiamata, deve ritornare una tra queste outcomes.

Puoi vedere ogni nodo come un programma indipendente. execute() è il suo ciclo di funzionamento. Apena si realizza una certa condizione, la funzione ritorna un risultato. Il fatto che poi quel risultato venga utilizzato per far partire un altro stato è del tutto indifferente: è qualcosa che viene realizzato esternamente allo stato.

Passo 2 — Istanziazione della macchina

Anzitutto, bisogna istanziare un oggetto di tipo smach.StateMachine che sarà la base della macchina.

sm = smach.StateMachine( outcomes=[ ... ] )

outcomes contiene le etichette degli stati finali. Nota che questa classe è solo un handler per far partire la macchina e per ottenere informazioni. La registrazione viene fatta direttamente la libreria!

Per registrare gli stati e le transizioni tra stati, si usa il metodo .add :

💡 Il primo stato che viene registrato è quello iniziale!

# sintassi:
#    .add( 'state_label', state_class(), transitions={...}
#    transitions={ ..., 'outcome_label':'next_state_label' }
'''
ES: myBestState: out1(->out1state) , out2(->out2state)
'''
with sm:
	smach.StateMachine.add( 'bestState', myBestState(),
		transitions={'out1':'oit1state', 'out2':'out2state'} )

e così via. Nota bene: per inserire gli stati si usa non l'handler direttamente, ma smach.StateMachine. Chi dice alla classe smach che sto lavorando proprio con quell'handler? Glielo dice lo statement with: è per questo che bisogna usare proprio questa struttura per poter aggiungere stati alla macchina.

Passo 3 — Esecuzione della macchina

Per avviare la macchina a stati,

outcome = sm.execute()
# rospy.spin()

Nota bene: il metodo execute ritorna una outcome. Questo perchè non tutte le macchine eseguono un compito all'infinito. Molte macchine a stati rappresentano modi di eseguire una computazione in generale finita. Per questo il metodo ritorna una outcome.

Ovviamente se la macchina contenesse dei cicli o dovesse funzionare in eterno, il metodo non ritornerebbe mai. Qualunque comportamento tu voglia implementare, la forma di questa funzione va bene.

Remapping

🔗 a tutorial page about remapping in SMACH from the ROS1 Wiki

Input/Output keys

In quanto programma indipendente, uno stato può richiedere esplicitamente delle informazioni in ingresso. Ecco come:

  • lo stato richiede da costruttore le sue input_keys e output_keys come liste
  • lo stato può usare queste chiavi come variabili nella execute() usando il parametro userdata come oggetto.

Vedi ad esempio qui di seguito, uno stato che richiede una chiave in ingresso e una in uscita.

class Foo(smach.State):
   def __init__(self, outcomes=['outcome1', 'outcome2'],
                      input_keys=['foo_input'],
                      output_keys=['foo_output'])

   def execute(self, userdata):
      # Do something with userdata
      if userdata.foo_input == 1:
          return 'outcome1'
      else:
          userdata.foo_output = 3
          return 'outcome2'

Lato main, ecco come eseguire il remapping di queste variabili:

sm_top = smach.StateMachine(outcomes=['outcome4','outcome5'],
                          input_keys=['sm_input'],
                          output_keys=['sm_output'])
  with sm_top:
     smach.StateMachine.add('FOO', Foo(),
                            transitions={'outcome1':'BAR',
                                         'outcome2':'outcome4'},
                            remapping={'foo_input':'sm_input',
                                       'foo_output':'sm_data'})
     smach.StateMachine.add('BAR', Bar(),
                            transitions={'outcome2':'FOO'},
                            remapping={'bar_input':'sm_data',
                                       'bar_output1':'sm_output'})

Lo schema realizzato da questo "semplice" codice è il seguente:

SMACH_remapping

Un altro esempio di remapping lato main che non usa parametri ausiliari a costruttore ma direttamente l'istanziazione di un nuovo campo nell'handler:

def main():
    rospy.init_node('smach_example_state_machine')

    # Create a SMACH state machine
    sm = smach.StateMachine(outcomes=['outcome4'])
    sm.userdata.sm_counter = 0

    # Open the container
    with sm:
        # Add states to the container
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'},
                               remapping={'foo_counter_in':'sm_counter',
                                          'foo_counter_out':'sm_counter'})
        smach.StateMachine.add('BAR', Bar(), 
                               transitions={'outcome1':'FOO'},
                               remapping={'bar_counter_in':'sm_counter'})

Nested State Machines

🔗 See an explaination here

Come fare? Nulla di più semplice. Considera che ogni macchina a stati ha il suo handler, e che l'handler non ha alcuna differenza da una classe blocco se non per il fatto che rappresennta una macchina a stati piuttosto che un nodo.

Che significa tutto questo? Che è possibile astrarre una macchina a stati pensandola come uno stato della macchina.

considera l'esempio:

SMACH_nest_1

SUB è la macchina a stati al livello inferiore. Se la si espande, si ottiene

SMACH_nest_2

A livello di codice, nulla di più semplice:

sm_top = smach.StateMachine( outcomes=[ 'outcome5' ] )
with sm_top:
	# gli stati
	smach.StateMachine.add( 'BAS', bas(), 
		transitions={ 'outcome3':'SUB' } )
	#e la sotto macchina a stati
	sm_SUB = smach.StateMachine( outcomes=[ 'outcome4' ] )
	with sm_SUB:
		#definizione di SUB
		smach.StateMachine.add( 'FOO', foo(), 
			transitions={ ... } )
		smach.StateMachine.add( 'BAR', ... )
	# la macchina a stati SUB è solo un altro stato per sm_top
	smach.StateMachine.add( 'SUB', sm_SUB, 
		transitions={ 'outcome4':'outcome5' }

An example -- how to use SMACH

#!/usr/bin/env python

import rospy
import smach, smach_ros



class init_state( smach.State ):
	def __init__( self ):
		smach.State.__init__( self, outcomes=['read_a', 'read_others'], input_keys=['idx', 'seq'], output_keys=['new_idx'] )
	
	def execute( self, data ):
		if data.seq[0] == 'a':
			data.new_idx = 1
			return 'read_a'
		else:
			return 'read_others'

class read_state_1( smach.State ):
	def __init__( self ):
		smach.State.__init__( self, outcomes=['read_s', 'read_q', 'read_a', 'read_others'], input_keys=['idx', 'seq'], output_keys=['new_idx'] )
	
	def execute( self, data ):
		if data.seq[data.idx] == 's':
			data.new_idx = data.idx + 1
			return 'read_s'
		elif data.seq[data.idx] == 'q':
			data.new_idx = data.idx + 1
			return 'read_q'
		elif data.seq[data.idx] == 'a':
			data.new_idx = data.idx + 1
			return 'read_a'
		else:
			return 'read_others'

class read_state_2( smach.State ):
	def __init__( self ):
		smach.State.__init__( self, outcomes=['read_p', 'read_others'], input_keys=['idx', 'seq'], output_keys=['new_idx'] )
	
	def execute( self, data ):
		if data.seq[data.idx] == 'p':
			data.new_idx = data.idx + 1
			return 'read_p'
		else:
			data.new_idx = data.idx + 1
			return 'read_others'




if __name__ == "__main__":
	
	rospy.init_node( "test_state_machine_1" )
	
	sm = smach.StateMachine( outcomes = ['accepted', 'discarded'] )
	sm.userdata.index = 0;
	sm.userdata.sequence = ""
	
	sm_remapping = {'idx':'index', 'seq':'sequence', 'new_idx':'idx'}
	
	with sm:
		smach.StateMachine.add( 'init_state', init_state(), transitions={'read_a':'read_state_1', 'read_others':'discarded'}, remapping=sm_remapping )
		smach.StateMachine.add( 'read_state_1', read_state_1(), transitions={'read_s':'read_state_1', 'read_q':'read_state_2', 'read_a':'accepted', 'read_others':'discarded'}, remapping=sm_remapping )
		smach.StateMachine.add( 'read_state_2', read_state_2(), transitions={ 'read_p':'read_state_1', 'read_others':'read_state_2' }, remapping=sm_remapping )
	
	# rosrun smach_viewer smach_viewer.py
	sis = smach_ros.IntrospectionServer('server_name', sm, '/SM_ROOT')
	sis.start()
	
	sm.userdata.sequence = "asssssqiusfhnvsiufnhcisuhcniefscpsssssssa"
	rospy.loginfo( "word: %s > outcome: %s", sm.userdata.sequence, sm.execute( ) )
	
	rospy.spin()
	sis.stop()

A more interesting example

🔗 see this project on GitHub

Clone this wiki locally