ROS SMACH示例教程(三)
本用例将探索SMACH的可用性和学习曲线。这个用例从简单地使用SMACH API开始,最后是一个与其他ROS系统接口的具体示例:一名可行性脚本将在turtlesim中协调两个Turtle。
这个用例将展示如何使用python、ROS、rospy以及actionlib。
本文源码均以给出,以下是源码链接:
https://github.com/lizhiwei0304/ROS_SMACH_TUTORIALS
1. 创建ROS包
首先,根据使用的依赖创建ROS包,其中主要使用的依赖有rospy、std_srvs、smach、 turtlesim
roscreate-pkg smach_usecase rospy std_srvs smach turtlrsim
roscd smach_usecase
其次,在smach_usecase
的src
下面建立一个launch
文件夹,定义一个turtle_nodes.launch
来启动turtulesim
的仿真环境,turtle_nodes.launch
的内容如下:
<launch>
<node pkg="tertlesim" name="turtlesim" type="turtlesim"/node>
</launch>
最后你可以输入以下命令来启动turtlesim
的仿真环境
roslaunch smach_usecase turtle_nodes.launch
2.创建一个可执行文件
在smach_usecase
下创建一个scriptes
文件夹,在这个文件夹下面你可以建立相关的executive.py
。这个脚本将是用例其余部分的主要焦点。
在这个脚本中,创建SMACH高的主要框架。这包括导入头文件、main()函数的定义以及空SMACH状态机的创建。
#!/usr/bin/env python
import rospy
import smach
from smach import StateMachine
def main():
rospy.init_node('smach_usecase_executive_step_02')
# Create a SMACH state machine
sm_root = StateMachine(outcomes=[])
# Open the container
with sm_root:
pass
outcome = sm_root.execute()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
if __name__ == '__main__':
main()
在终端中输入以下指令,则开源看到该文件的运行结果
指令:
source smach_usecae exective.py
注意:exective.py
对应源码中的exective_step_02.py
其程序运行结果为
哈哈哈,熟悉的小乌龟又回来啦!!!
[ ERROR ] : InvalidTransitionError: State machine failed consistency check:
No initial state set.
Available states: []
[ ERROR ] : Container consistency check failed.
[ ERROR ] : InvalidTransitionError: State machine failed consistency check:
No initial state set.
Available states: []
[ ERROR ] : Container consistency check failed.
哈哈哈,是不是看到一堆ERROR!!!
不要慌,这是因为这个脚本并不是可以直接用的,只是一个程序框架,需要在里面填充好内容就可以使用了,下面就让我们开始写入内容吧!
3.添加状态机的serivce的调用关系
在sm_root
的状态中添加两个状态,第一个状态用来调用turtlesim的reset服务,第二个状态机是多添加一个新的小乌龟“turtle2”,在(10,0)
这两个任务都是通过服务调用执行的,可以通过SMACH SMACH来实现。ServiceState表示ROS服务调用的执行。它们提供了“成功”、“中止”和“抢占”的潜在结果。
- 第一个服务的名称为 "reset"并且服务类型为std_srvs.Empty.
- 第一个服务的名称为 "spwan"并且服务类型为turtlesim.srv.Spawn,它的turtlesim.srv.SpawnRequest需要四个参数(x, y, theta, name)
一旦你添加了这两个状态,你应该能够启动turtlesim启动文件,然后运行executive.py。当运行执行程序时,你应该看到turtlesim被重置,然后一个新的turtlesim会出现在(5,0)的角落里。
其脚本内容为(exective_step_03.py)
#!/usr/bin/env python
import rospy
import smach
from smach_ros import ServiceState
from smach import StateMachine
import std_srvs.srv
import turtlesim.srv
import turtle_actionlib.msg
def main():
rospy.init_node('smach_usecase_executive')
# 创建好一个 SMACH 状态机
sm_root = StateMachine(outcomes=['succeeded','aborted','preempted'])
# 打开状态机
with sm_root:
###########新添加的内容###########
# 添加第一个状态
## RESET表示状态的名称
## ServiceState('reset', std_srvs.srv.Empty)中表示tultersim的reset服务名称,std_srvs.srv.Empty表示reset服务的srv消息为空
##
StateMachine.add('RESET',
ServiceState('reset', std_srvs.srv.Empty),
{'succeeded':'SPAWN'})
# 添加第二个状态
StateMachine.add('SPAWN',
ServiceState('spawn', turtlesim.srv.Spawn,
request = turtlesim.srv.SpawnRequest(5.0,0.0,0.0,'turtle2')))
################################
# Execute SMACH tree in a separate thread so that we can ctrl-c the script
outcome = sm_root.execute()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
if __name__ == '__main__':
main()
程序运行结果
[INFO] [1648085761.120627]: State machine starting in initial state 'RESET' with userdata:
[]
[INFO] [1648085761.230433]: State machine transitioning 'RESET':'succeeded'-->'SPAWN'
[INFO] [1648085761.278877]: State machine terminating 'SPAWN':'succeeded':'succeeded'
4. 使用SMACH Viewer
可以在执行SMACH树时查看其结构和状态。这是通过加一个Smach Viewer来实现的。内省服务器指向给定SMACH树的根。然后,可以运行smach_viewer在树上进行反思。
smach_查看器教程展示了如何实现这一点。添加内省服务器后,可以像这样运行SMACH Viewer:
rosrun smach_viewer smach_viewer.py
其在程序中的主要改动为(exective_step_04.py)
#!/usr/bin/env python
import rospy
import threading
import smach
from smach_ros import ServiceState, IntrospectionServer
from smach import StateMachine
import std_srvs.srv
import turtlesim.srv
import turtle_actionlib.msg
def main():
rospy.init_node('smach_usecase_executive_03')
# Create a SMACH state machine
sm_root = StateMachine(outcomes=['succeeded','aborted','preempted'])
# Open the container
with sm_root:
# Reset turtlesim
StateMachine.add('RESET',
ServiceState('reset', std_srvs.srv.Empty),
{'succeeded':'SPAWN'})
# Create a second turtle
StateMachine.add('SPAWN',
ServiceState('spawn', turtlesim.srv.Spawn,
request = turtlesim.srv.SpawnRequest(0.0,0.0,0.0,'turtle2')))
########################
# 添加一个IntrospectionServer来查看smach
sis = IntrospectionServer('smach_usecase_01', sm_root, '/USE_CASE')
# 开始查看IntrospectionServer
sis.start()
outcome = sm_root.execute()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
########################
# 结束查看IntrospectionServer
sis.stop()
if __name__ == '__main__':
main()
程序运行结果
[INFO] [1648086781.341742]: State machine starting in initial state 'RESET' with userdata:
[]
[INFO] [1648086781.486968]: State machine transitioning 'RESET':'succeeded'-->'SPAWN'
[INFO] [1648086781.552404]: State machine terminating 'SPAWN':'succeeded':'succeeded'
如果在上一节中将状态命名为“RESET”和“SPWAN”,则应该会看到如下结构:
在上图中,状态由椭圆表示,它们的结果由有向边表示。容器“sm_root”的结果以红色显示。将state名称大写、用小写字母书写结果以及在名称中出现多个单词时使用下划线是一种很好的习惯。
5.TurtleSim形状动作添加界面
接下来,您将添加一些更多的任务和状态序列。这些任务将涉及调用更多的服务,以及几个actionlib操作。在添加调用操作的代码之前,我们需要将操作服务器添加到启动文件中。
在“turtle_nodes.launch”中添加以下几行将打开一对动作服务器,可以用这两个turtle绘制多边形:
<node pkg="turtle_actionlib" name="turtle_shape1" type="shape_server"/>
<node pkg="turtle_actionlib" name="turtle_shape2" type="shape_server">
<remap from="/turtle1/pose" to="/turtle2/pose"/>
<remap from="/turtle1/cmd_vel" to="/turtle2/cmd_vel"/>
</node>
在添加操作状态之前,再添加两个服务状态。第一个应将“turtle1”移动到坐标(5.0,1.0),第二个应将“turtle2”移动到坐标(9.0,5.0)。这些服务分别称为“turtle1/teleport_absolute”和“turtle2/teleport_absolute”。他们使用turtlesim类型的服务。srv.TeleportAbsolute,请求类型有三个参数(x,y,theta)。
接下来添加两个状态,将目标发送到操作服务器“turtle_shape1”和“turtle_shape2”,这两个操作服务器是我们添加到上面的启动文件中的。这可以通过smach实现。SimpleActionState。
第一个state应该用turtle1画一个半径为4.0的大十一边形(11边),第二个state应该用turtle2画一个半径为0.5的小六边形。动作类型为turtle_actionlib。ShapeAction和目标类型有两个参数(边、半径)。
添加上述四种状态后,您应该能够运行脚本,看到两个海龟各自绘制一个多边形。
其脚本文件为(exective_step_05.py)
#!/usr/bin/env python
import rospy
import threading
import smach
from smach_ros import ServiceState, SimpleActionState, IntrospectionServer, set_preempt_handler
from smach import StateMachine
import std_srvs.srv
import turtlesim.srv
import turtle_actionlib.msg
def main():
rospy.init_node('smach_usecase_executive')
# Construct static goals
polygon_big = turtle_actionlib.msg.ShapeGoal(edges = 11, radius = 4.0)
polygon_small = turtle_actionlib.msg.ShapeGoal(edges = 6, radius = 0.5)
# Create a SMACH state machine
sm_root = StateMachine(outcomes=['succeeded','aborted','preempted'])
# Open the container
with sm_root:
# Reset turtlesim
StateMachine.add('RESET',
ServiceState('reset', std_srvs.srv.Empty),
{'succeeded':'SPAWN'})
# Create a second turtle
StateMachine.add('SPAWN',
ServiceState('spawn', turtlesim.srv.Spawn,
request = turtlesim.srv.SpawnRequest(0.0,0.0,0.0,'turtle2')),
{'succeeded':'TELEPORT1'})
# Teleport turtle 1
StateMachine.add('TELEPORT1',
ServiceState('turtle1/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(5.0,1.0,0.0)),
{'succeeded':'TELEPORT2'})
# Teleport turtle 2
StateMachine.add('TELEPORT2',
ServiceState('turtle2/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(9.0,5.0,0.0)),
{'succeeded':'BIG'})
#######################新添加的######################
StateMachine.add("BIG",
SimpleActionState('turtle_shape1', turtle_actionlib.msg.ShapeAction,
goal = polygon_big),
{'succeeded':'SMALL'})
StateMachine.add("SMALL",
SimpleActionState('turtle_shape2', turtle_actionlib.msg.ShapeAction,
goal = polygon_small))
###################################################
# Attach a SMACH introspection server
sis = IntrospectionServer('smach_usecase_01', sm_root, '/USE_CASE')
sis.start()
# Set preempt handler
set_preempt_handler(sm_root)
# Execute SMACH tree in a separate thread so that we can ctrl-c the script
smach_thread = threading.Thread(target = sm_root.execute)
smach_thread.start()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
如果运行SMACH Viewer,并使用所示的命名约定,turtle1绘制大多边形时,应该会看到下图。
6.并行绘制形状
在下一步中,您将把两个图形绘制状态合并。这将同时发送两个行动目标,并等待它们都终止。
首先构造一个并发,可以在这里找到一个教程。将并发添加到状态机,然后将发送形状目标的两个状态从根状态机移动到并发中。
在执行代码时,您现在应该看到两只海龟同时移动,如下所示:
此外,如果遵循相同的命名约定,SMACH Viewer现在应该显示类似于以下内容的结构:
[图片上传失败...(image-f691dd-1648103746682)]
其脚本文件内容为
#!/usr/bin/env python
import rospy
import threading
import smach
from smach_ros import ServiceState, SimpleActionState, IntrospectionServer, set_preempt_handler
from smach import StateMachine, Concurrence
import std_srvs.srv
import turtlesim.srv
import turtle_actionlib.msg
def main():
rospy.init_node('smach_usecase_executive')
# Construct static goals
polygon_big = turtle_actionlib.msg.ShapeGoal(edges = 11, radius = 4.0)
polygon_small = turtle_actionlib.msg.ShapeGoal(edges = 6, radius = 0.5)
# Create a SMACH state machine
sm_root = StateMachine(outcomes=['succeeded','aborted','preempted'])
# Open the container
with sm_root:
# Reset turtlesim
StateMachine.add('RESET',
ServiceState('reset', std_srvs.srv.Empty),
{'succeeded':'SPAWN'})
# Create a second turtle
StateMachine.add('SPAWN',
ServiceState('spawn', turtlesim.srv.Spawn,
request = turtlesim.srv.SpawnRequest(0.0,0.0,0.0,'turtle2')),
{'succeeded':'TELEPORT1'})
# Teleport turtle 1
StateMachine.add('TELEPORT1',
ServiceState('turtle1/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(5.0,1.0,0.0)),
{'succeeded':'TELEPORT2'})
# Teleport turtle 2
StateMachine.add('TELEPORT2',
ServiceState('turtle2/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(9.0,5.0,0.0)),
{'succeeded':'DRAW_SHAPES'})
# 改动一:创建Concurrence(在上述改变状态触发条件)
shape_cc = Concurrence(
outcomes=['succeeded','aborted','preempted'],
default_outcome='aborted',
outcome_map ={'succeeded':{'BIG':'succeeded','SMALL':'succeeded'}})
# 改动二:添加Concurrence到状态机
StateMachine.add('DRAW_SHAPES',shape_cc)
#改动三:添加单体状态
with shape_cc:
Concurrence.add("BIG",
SimpleActionState('turtle_shape1', turtle_actionlib.msg.ShapeAction,
goal = polygon_big))
Concurrence.add("SMALL",
SimpleActionState('turtle_shape2', turtle_actionlib.msg.ShapeAction,
goal = polygon_small))
# Attach a SMACH introspection server
sis = IntrospectionServer('smach_usecase_01', sm_root, '/USE_CASE')
sis.start()
# Set preempt handler
set_preempt_handler(sm_root)
# Execute SMACH tree in a separate thread so that we can ctrl-c the script
smach_thread = threading.Thread(target = sm_root.execute)
smach_thread.start()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
7. 当第一只乌龟接近第二只乌龟之后,让乌龟停下来
在下一步中,当turtle1靠得太近时,让turtle2停止。这涉及到实现SMACH的监控模式。这种模式背后的想法是,当某些条件不再满足时,允许一个状态抢占(或停止)一个状态。
SMACH ROS库有一个监视器状态,它提供了将条件回调与主题和消息类型关联的机制。每次状态收到有关指定主题的消息时,都会调用回调。如果回调返回True,那么它将继续阻塞,但如果返回False,它将以“invalid”结果终止。有关监视器状态的更多信息,请参见MonitorState教程页面。
这样一个状态可以与另一个状态并发。那么,当条件不再成立时,要让它杀死同级,唯一需要做的就是给并发一个“子终止回调”。每次SMACH并发中的子级终止时,都会调用此回调(如果指定),并确定是否应向其他状态发送抢占信号。
当使用并发和监视器状态来实现此模式时,子终止回调可以是一个始终返回True的匿名函数:
...
child_termination_cb = lambda state_outcomes: True,
...
这样,终止的第一个状态将导致并发中的其他状态被抢占。如果遵循相同的命名约定,在对executive中的SMACH树进行此修改后,SMACH查看器现在应该显示类似于以下内容的结构:
其脚本代码为(exective_step_07.py)
#!/usr/bin/env python
import rospy
import threading
import smach
from math import sqrt, pow
from smach_ros import ServiceState, SimpleActionState, IntrospectionServer,set_preempt_handler, MonitorState
from smach import StateMachine, Concurrence
import std_srvs.srv
import turtlesim.srv
import turtlesim.msg
import turtle_actionlib.msg
def main():
rospy.init_node('smach_usecase_executive')
# Construct static goals
polygon_big = turtle_actionlib.msg.ShapeGoal(edges = 11, radius = 4.0)
polygon_small = turtle_actionlib.msg.ShapeGoal(edges = 6, radius = 0.5)
# Create a SMACH state machine
sm_root = StateMachine(outcomes=['succeeded','aborted','preempted'])
# Open the container
with sm_root:
# Reset turtlesim
StateMachine.add('RESET',
ServiceState('reset', std_srvs.srv.Empty),
{'succeeded':'SPAWN'})
# Create a second turtle
StateMachine.add('SPAWN',
ServiceState('spawn', turtlesim.srv.Spawn,
request = turtlesim.srv.SpawnRequest(0.0,0.0,0.0,'turtle2')),
{'succeeded':'TELEPORT1'})
# Teleport turtle 1
StateMachine.add('TELEPORT1',
ServiceState('turtle1/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(5.0,1.0,0.0)),
{'succeeded':'TELEPORT2'})
# Teleport turtle 2
StateMachine.add('TELEPORT2',
ServiceState('turtle2/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(9.0,5.0,0.0)),
{'succeeded':'DRAW_SHAPES'})
shape_cc = Concurrence(
outcomes=['succeeded','aborted','preempted'],
default_outcome='aborted',
outcome_map ={'succeeded':{'BIG':'succeeded','SMALL':'succeeded'}})
StateMachine.add('DRAW_SHAPES',shape_cc)
with shape_cc:
Concurrence.add("BIG",
SimpleActionState('turtle_shape1', turtle_actionlib.msg.ShapeAction,
goal = polygon_big))
draw_monitor_cc = Concurrence(
['succeeded','aborted','preempted'],
'aborted',
child_termination_cb = lambda so: True,
outcome_map = {
'succeeded':{'DRAW':'succeeded'},
'preempted':{'DRAW':'preempted','MONITOR':'preempted'},
'aborted':{'MONITOR':'invalid'}})
Concurrence.add('SMALL',draw_monitor_cc)
with draw_monitor_cc:
Concurrence.add("DRAW",
SimpleActionState('turtle_shape2', turtle_actionlib.msg.ShapeAction,
goal = polygon_small))
def turtle_far_away(ud, msg):
"""Returns True while turtle pose in msg is at least 1 unit away from (9,5)"""
if sqrt(pow(msg.x-9.0,2) + pow(msg.y-5.0,2)) > 2.0:
return True
return False
Concurrence.add('MONITOR',
MonitorState('/turtle1/pose',turtlesim.msg.Pose,
cond_cb = turtle_far_away))
# Attach a SMACH introspection server
sis = IntrospectionServer('smach_usecase_01', sm_root, '/USE_CASE')
sis.start()
# Set preempt handler
set_preempt_handler(sm_root)
# Execute SMACH tree in a separate thread so that we can ctrl-c the script
smach_thread = threading.Thread(target = sm_root.execute)
smach_thread.start()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()
8.让第二只乌龟在第一只乌龟离开后重新开始
对该执行器行为的最后修改是向小多边形图形添加一些简单的恢复行为。在上一步中,turtle2一旦停止绘制,就再也不会开始绘制。在这一步中,您将需要添加另一个监视器,它可以在海龟靠近时保持。
为了创建返回的结果循环,还需要将监视器并发和这个新监视器一起放入状态机。请参见下面SMACH Viewer中的图表,以更好地了解其外观。
其脚本为(exective_step_08.py)
#!/usr/bin/env python
import rospy
import threading
import smach
from math import sqrt, pow
from smach_ros import ServiceState, SimpleActionState, IntrospectionServer,set_preempt_handler, MonitorState
from smach import StateMachine, Concurrence
import std_srvs.srv
import turtlesim.srv
import turtlesim.msg
import turtle_actionlib.msg
def main():
rospy.init_node('smach_usecase_executive_08')
# Construct static goals
polygon_big = turtle_actionlib.msg.ShapeGoal(edges = 11, radius = 4.0)
polygon_small = turtle_actionlib.msg.ShapeGoal(edges = 6, radius = 0.5)
# Create a SMACH state machine
sm_root = StateMachine(outcomes=['succeeded','aborted','preempted'])
# Open the container
with sm_root:
# Reset turtlesim
StateMachine.add('RESET',
ServiceState('reset', std_srvs.srv.Empty),
{'succeeded':'SPAWN'})
# Create a second turtle
StateMachine.add('SPAWN',
ServiceState('spawn', turtlesim.srv.Spawn,
request=turtlesim.srv.SpawnRequest(0.0, 0.0, 0.0, 'turtle2')),
{'succeeded': 'TELEPORT1'})
# Teleport turtle 1
StateMachine.add('TELEPORT1',
ServiceState('turtle1/teleport_absolute',
turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(5.0,1.0,0.0)),
{'succeeded':'DRAW_SHAPES'})
shape_cc = Concurrence(outcomes=['succeeded','aborted','preempted'],
default_outcome='aborted',
outcome_map ={'succeeded':{'BIG':'succeeded','SMALL':'succeeded'}})
StateMachine.add('DRAW_SHAPES',shape_cc)
with shape_cc:
Concurrence.add("BIG",
SimpleActionState('turtle_shape1', turtle_actionlib.msg.ShapeAction,
goal = polygon_big))
# Draw a small polygon with the second turtle
small_shape_sm = StateMachine( outcomes=['succeeded', 'aborted', 'preempted'] )
Concurrence.add('SMALL', small_shape_sm)
with small_shape_sm:
# Teleport turtle 2
StateMachine.add('TELEPORT2',
ServiceState('turtle2/teleport_absolute', turtlesim.srv.TeleportAbsolute,
request = turtlesim.srv.TeleportAbsoluteRequest(9.0,5.0,0.0)),
{'succeeded':'DRAW_WITH_MONITOR'})
draw_monitor_cc = Concurrence(
['succeeded','aborted','preempted','interrupted'],
'aborted',
child_termination_cb = lambda so: True,
outcome_map = {
'succeeded':{'DRAW':'succeeded'},
'preempted':{'DRAW':'preempted','MONITOR':'preempted'},
'interrupted':{'MONITOR':'invalid'}})
StateMachine.add('DRAW_WITH_MONITOR',
draw_monitor_cc,
{'interrupted':'WAIT_FOR_CLEAR'})
with draw_monitor_cc:
Concurrence.add("DRAW",
SimpleActionState('turtle_shape2', turtle_actionlib.msg.ShapeAction,
goal = polygon_small))
def turtle_far_away(ud, msg):
"""Returns True while turtle pose in msg is at least 1 unit away from (9,5)"""
if sqrt(pow(msg.x-9.0,2) + pow(msg.y-5.0,2)) > 2.0:
return True
return False
Concurrence.add('MONITOR',
MonitorState('/turtle1/pose',turtlesim.msg.Pose,
cond_cb = turtle_far_away))
StateMachine.add('WAIT_FOR_CLEAR',
MonitorState('/turtle1/pose',turtlesim.msg.Pose,
cond_cb = lambda ud,msg: not turtle_far_away(ud,msg)),
{'valid':'WAIT_FOR_CLEAR','invalid':'TELEPORT2'})
# Attach a SMACH introspection server
sis = IntrospectionServer('smach_usecase_01', sm_root, '/USE_CASE')
sis.start()
# Set preempt handler
set_preempt_handler(sm_root)
# Execute SMACH tree in a separate thread so that we can ctrl-c the script
smach_thread = threading.Thread(target = sm_root.execute)
smach_thread.start()
# Signal ROS shutdown (kill threads in background)
rospy.spin()
sis.stop()
if __name__ == '__main__':
main()