ROS1(Robot Operating System)作为机器人领域最成功的中间件框架,从 2007 年诞生至今已经成为学术界和工业界的事实标准。尽管 ROS2 带来了诸多架构改进,但理解 ROS1 的核心设计理念对于掌握 ROS2 至关重要。本章将系统回顾 ROS1 的核心架构,重点分析其设计决策背后的权衡,为后续章节理解 ROS2 的改进动机奠定基础。
学习目标:
ROS1 采用了中心化的 Master 节点设计,这是整个系统的神经中枢。Master 节点本质上是一个轻量级的名称服务器(Name Server),提供以下核心功能:
+----------------+
| ROS Master |
| (roscore) |
+-------+--------+
|
+-------+--------+
| Name Service |
| Registry |
+----------------+
/ | \
/ | \
+-----+ | +-----+
|Node1| | |Node2|
+-----+ | +-----+
|
+-------+
|Node3 |
+-------+
ROS1 使用 XMLRPC 作为节点与 Master 之间的通信协议。这个选择反映了 2007 年的技术栈现状:XMLRPC 简单、跨语言支持好,但也带来了性能开销。
节点启动与注册流程:
ROS_MASTER_URI 环境变量找到 MasterregisterNode() 方法注册自己话题订阅建立流程:
发布者节点 Master 订阅者节点
| | |
|--registerPublisher-->| |
| |<--registerSubscriber-|
| | |
| |--publisherUpdate---->|
|<-----------------requestTopic---------------|
|------------------TCPROS连接---------------->|
这个过程的关键点:
ROS1 的分布式架构设计有几个重要特征:
1. 松耦合通信 节点之间通过话题进行松耦合通信,发布者和订阅者互不知晓对方存在。这种设计带来了极大的灵活性,但也引入了一些挑战:
2. 点对点数据传输 数据不经过 Master 直接在节点间传输,这个设计决策影响深远:
带宽利用率 = 数据量 / (数据量 + 协议开销)
对于 ROS1:
- 小消息(<1KB):带宽利用率约 60-70%
- 大消息(>100KB):带宽利用率可达 95%+
3. 网络透明性 ROS1 的网络透明性设计让分布式部署变得简单,但也带来了安全隐患:
在多机环境下部署 ROS1 需要careful配置:
# 机器 A (Master 所在)
export ROS_MASTER_URI=http://192.168.1.100:11311
export ROS_IP=192.168.1.100
# 机器 B (Worker 节点)
export ROS_MASTER_URI=http://192.168.1.100:11311
export ROS_IP=192.168.1.101
export ROS_HOSTNAME=worker-robot # 可选,用于 DNS 解析
网络配置检查清单:
话题是 ROS1 中最基础的通信机制,实现了经典的发布-订阅模式。
消息传输特征:
TCPROS 协议细节:
[4字节长度][消息序列化数据]
|
+-- 使用 ROS 消息序列化格式
(类似 Protocol Buffers 但更简单)
性能特征分析:
队列管理策略:
# 发布者队列大小设置
pub = rospy.Publisher('topic', MessageType, queue_size=10)
# queue_size 影响:
# - 太小:高频发布时可能丢失消息
# - 太大:占用内存,增加延迟
服务提供同步的请求-响应通信模式,适合需要确定性结果的场景。
服务调用流程:
客户端 服务器
| |
|---请求(Request)------>|
| |处理请求
|<---响应(Response)------|
| |
关键设计决策:
性能考量:
服务调用开销 = 连接建立时间 + 请求传输 + 处理时间 + 响应传输
典型场景:
- 小请求(<1KB):总开销 5-10ms
- 大请求(>10KB):主要受网络带宽限制
持久连接优化:
# 使用持久连接减少开销
from rospy import ServiceProxy
service = ServiceProxy('service_name', ServiceType, persistent=True)
# 重用 TCP 连接,减少握手开销
动作是 ROS1 中最复杂的通信机制,适合长时间运行的任务。
动作协议的五个组成部分:
动作内部实现 = 5个话题 + 状态机管理
/action_name/goal (目标发送)
/action_name/cancel (取消请求)
/action_name/status (状态更新)
/action_name/feedback (进度反馈)
/action_name/result (最终结果)
状态机转换图:
[PENDING]
|
v
[ACTIVE] <---> [PREEMPTING]
/ \ |
v v v
[SUCCEEDED] [ABORTED] [PREEMPTED]
设计模式应用场景:
Catkin 是 ROS1 的构建系统,基于 CMake 扩展而来,解决了大规模机器人软件的构建挑战。
核心设计目标:
catkin_ws/
├── src/ # 源代码目录
│ ├── package1/
│ │ ├── CMakeLists.txt
│ │ ├── package.xml
│ │ ├── src/
│ │ └── include/
│ └── package2/
├── build/ # 构建中间文件
│ └── [CMake 生成的构建文件]
├── devel/ # 开发空间
│ ├── setup.bash # 环境配置脚本
│ ├── lib/ # 编译的库文件
│ └── share/ # 资源文件
└── install/ # 安装空间(可选)
└── [发布版本文件]
cmake_minimum_required(VERSION 3.0.2)
project(my_robot_package)
# 查找 catkin 和依赖包
find_package(catkin REQUIRED COMPONENTS
roscpp
std_msgs
sensor_msgs
geometry_msgs
)
# 声明 catkin 包
catkin_package(
INCLUDE_DIRS include
LIBRARIES ${PROJECT_NAME}
CATKIN_DEPENDS roscpp std_msgs
DEPENDS eigen3 # 系统依赖
)
# 包含目录
include_directories(
include
${catkin_INCLUDE_DIRS}
)
# 编译库
add_library(${PROJECT_NAME}
src/algorithm.cpp
)
# 编译可执行文件
add_executable(robot_node src/main.cpp)
target_link_libraries(robot_node
${PROJECT_NAME}
${catkin_LIBRARIES}
)
# 安装规则
install(TARGETS robot_node
RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
package.xml 结构:
<?xml version="1.0"?>
<package format="2">
<name>my_robot_package</name>
<version>1.0.0</version>
<description>机器人控制包</description>
<maintainer email="dev@robot.com">Developer</maintainer>
<license>MIT</license>
<!-- 构建依赖 -->
<buildtool_depend>catkin</buildtool_depend>
<build_depend>roscpp</build_depend>
<!-- 运行依赖 -->
<exec_depend>roscpp</exec_depend>
<exec_depend>rospy</exec_depend>
<!-- 测试依赖 -->
<test_depend>rostest</test_depend>
</package>
依赖解析算法:
1. 并行构建加速:
# 使用所有 CPU 核心
catkin_make -j$(nproc)
# 或使用 catkin_tools(推荐)
catkin build --jobs $(nproc)
2. 增量构建优化:
# 只构建修改的包
catkin build --this
# 构建指定包及其依赖
catkin build package_name --deps
3. ccache 加速重复编译:
# 安装 ccache
sudo apt-get install ccache
# 配置 catkin 使用 ccache
export CC="ccache gcc"
export CXX="ccache g++"
ROS1 的参数服务器是一个中心化的配置存储系统,运行在 Master 节点上。它使用层次化的命名空间存储键值对。
参数类型支持:
命名空间层次结构:
/
├── robot_name # 全局参数
├── /navigation/
│ ├── max_velocity # 导航模块参数
│ ├── planner/
│ │ ├── algorithm # 规划器配置
│ │ └── resolution
│ └── controller/
│ └── gains # 控制器参数
└── /perception/
├── camera/
│ └── fps
└── lidar/
└── range
参数读写操作:
# Python API
import rospy
# 读取参数
max_vel = rospy.get_param('/navigation/max_velocity', 1.0) # 带默认值
params = rospy.get_param('/navigation/') # 获取整个命名空间
# 写入参数
rospy.set_param('/navigation/max_velocity', 2.0)
# 删除参数
rospy.delete_param('/navigation/obsolete_param')
# 检查参数存在
if rospy.has_param('/navigation/max_velocity'):
# 参数存在
pass
# C++ API
ros::NodeHandle nh;
double max_vel;
nh.getParam("/navigation/max_velocity", max_vel);
nh.setParam("/navigation/max_velocity", 2.0);
私有参数与相对命名:
# 私有参数(节点命名空间)
rospy.init_node('my_node')
# 参数实际路径:/my_node/param_name
private_param = rospy.get_param('~param_name')
# 相对参数(当前命名空间)
# 如果当前命名空间是 /robot1/
relative_param = rospy.get_param('sensor/range')
# 实际路径:/robot1/sensor/range
动态重配置是 ROS1 的一个强大特性,允许运行时修改参数而无需重启节点。
配置文件定义(.cfg):
#!/usr/bin/env python
from dynamic_reconfigure.parameter_generator_catkin import *
gen = ParameterGenerator()
# 添加参数:名称、类型、级别、描述、默认值、最小值、最大值
gen.add("max_velocity", double_t, 0,
"Maximum velocity", 1.0, 0.0, 5.0)
gen.add("enable_obstacle_avoidance", bool_t, 0,
"Enable obstacle avoidance", True)
# 枚举类型
algorithm_enum = gen.enum([
gen.const("DWA", int_t, 0, "Dynamic Window Approach"),
gen.const("TEB", int_t, 1, "Timed Elastic Band"),
gen.const("MPC", int_t, 2, "Model Predictive Control")
], "Planning algorithm selection")
gen.add("algorithm", int_t, 0,
"Path planning algorithm", 0, 0, 2,
edit_method=algorithm_enum)
exit(gen.generate("my_package", "my_node", "MyConfig"))
节点实现动态重配置:
#include <dynamic_reconfigure/server.h>
#include <my_package/MyConfigConfig.h>
class MyNode {
private:
dynamic_reconfigure::Server<my_package::MyConfigConfig> server_;
void configCallback(my_package::MyConfigConfig &config, uint32_t level) {
// 更新内部参数
max_velocity_ = config.max_velocity;
use_obstacle_avoidance_ = config.enable_obstacle_avoidance;
// 级别检查(哪些参数改变了)
if (level & 0x1) {
// 速度参数改变
updateVelocityController();
}
}
public:
MyNode() {
// 设置回调
server_.setCallback(
boost::bind(&MyNode::configCallback, this, _1, _2));
}
};
性能特征:
参数读取延迟 = 网络往返时间 + XMLRPC 解析
≈ 1-5ms(局域网)
批量操作优化:
- 单个参数读取:N 次网络往返
- 命名空间读取:1 次网络往返
- 推荐:启动时批量读取,缓存在本地
缓存策略:
class ParameterCache:
def __init__(self, namespace):
# 启动时批量读取
self.params = rospy.get_param(namespace)
self.namespace = namespace
def get(self, key, default=None):
# 从本地缓存读取
return self.params.get(key, default)
def refresh(self):
# 定期刷新缓存
self.params = rospy.get_param(self.namespace)
PR2(Personal Robot 2)是 Willow Garage 开发的双臂移动服务机器人,是 ROS1 发展史上的里程碑项目。其系统架构充分展示了 ROS1 在复杂机器人系统中的应用。
硬件规格:
节点拓扑结构:
PR2 ROS 节点架构
+------------------------------------------------+
| 高层任务规划 |
| task_executive move_base manipulation |
+------------------------------------------------+
|
+------------------------------------------------+
| 中间件服务 |
| tf robot_state diagnostics power_management |
+------------------------------------------------+
|
+------------------------------------------------+
| 硬件抽象层 |
| pr2_controller_manager pr2_ethercat |
+------------------------------------------------+
|
+------------------------------------------------+
| 驱动程序层 |
| motor_drivers sensor_drivers camera_drivers |
+------------------------------------------------+
关键设计决策:
激光雷达 (40Hz) ─┐
├─> 传感器融合 ─> 八叉树地图 ─> 导航规划
立体相机 (30Hz) ─┘ |
v
Kinect (30Hz) ────> 点云处理 ─> 物体识别 ─> 抓取规划
PR2 在不同场景下选择不同的 ROS1 通信机制:
话题使用场景:
服务使用场景:
动作使用场景:
1. 消息传输优化:
// 使用 nodelet 减少数据拷贝
class ImageProcessingNodelet : public nodelet::Nodelet {
// 同进程内使用指针传递,避免序列化
void imageCallback(const sensor_msgs::ImageConstPtr& msg) {
// 零拷贝处理
}
};
2. 话题分流策略:
# 高频数据使用独立话题
/base_scan # 40Hz 激光数据
/base_scan_filtered # 10Hz 滤波数据(导航使用)
/base_scan_marking # 5Hz 障碍标记(建图使用)
3. 参数服务器优化:
# 启动时批量加载参数
rosparam:
- file: config/navigation.yaml
ns: /move_base
- file: config/manipulation.yaml
ns: /arm_controller
PR2 实现了多层次的故障检测与恢复机制:
1. 硬件层安全机制:
2. 软件层监控:
# 诊断聚合器配置
analyzers:
motors:
type: diagnostic_aggregator/GenericAnalyzer
path: Motors
contains: ['motor_']
timeout: 5.0
sensors:
type: diagnostic_aggregator/GenericAnalyzer
path: Sensors
contains: ['laser', 'camera', 'imu']
timeout: 2.0
3. 系统级恢复策略:
PR2 项目为 ROS2 的设计提供了宝贵经验:
在大规模机器人系统中,网络拓扑设计直接影响系统性能:
星型 vs 网状拓扑:
星型拓扑(中心化): 网状拓扑(分布式):
Master Node1 ←→ Node2
/ | \ ↑ × ↓
Node1 Node2 Node3 Node3 ←→ Node4
延迟:O(1)跳 延迟:O(log n)跳
带宽:受中心限制 带宽:多路径均衡
容错:单点故障 容错:多路径冗余
优化策略:
1. 消息批处理(Message Batching):
class BatchedPublisher {
private:
std::vector<sensor_msgs::PointCloud2> batch_;
ros::Timer batch_timer_;
void batchTimerCallback(const ros::TimerEvent&) {
if (!batch_.empty()) {
// 打包发送
custom_msgs::PointCloudBatch msg;
msg.clouds = batch_;
batch_pub_.publish(msg);
batch_.clear();
}
}
public:
void addToBatch(const sensor_msgs::PointCloud2& cloud) {
batch_.push_back(cloud);
if (batch_.size() >= BATCH_SIZE) {
// 立即发送
batchTimerCallback(ros::TimerEvent());
}
}
};
2. 压缩传输:
import rospy
from sensor_msgs.msg import CompressedImage
import cv2
# 发布压缩图像
def publish_compressed(image):
# JPEG 压缩
_, compressed = cv2.imencode('.jpg', image,
[cv2.IMWRITE_JPEG_QUALITY, 80])
msg = CompressedImage()
msg.data = compressed.tostring()
msg.format = "jpeg"
compressed_pub.publish(msg)
3. 共享内存传输(同机优化):
// 使用 nodelet 实现零拷贝
namespace my_package {
class ProcessingNodelet : public nodelet::Nodelet {
void onInit() override {
// 订阅和发布使用共享指针
sub_ = nh_.subscribe<sensor_msgs::PointCloud2>(
"input", 1,
boost::bind(&ProcessingNodelet::callback, this, _1));
}
void callback(const sensor_msgs::PointCloud2ConstPtr& msg) {
// 直接操作指针,无需拷贝
processCloud(msg);
}
};
}
Foreign Relay 是最简单的多 Master 连接方案:
# foreign_relay.py
import rospy
from std_msgs.msg import String
class ForeignRelay:
def __init__(self, foreign_master_uri, local_topic, foreign_topic):
# 连接到外部 Master
self.foreign_master = xmlrpclib.ServerProxy(foreign_master_uri)
# 本地发布者
self.local_pub = rospy.Publisher(local_topic, String, queue_size=10)
# 定期拉取外部话题
self.timer = rospy.Timer(rospy.Duration(0.1), self.relay_callback)
def relay_callback(self, event):
# 从外部 Master 获取数据
data = self.fetch_foreign_topic()
if data:
self.local_pub.publish(data)
优缺点分析:
Multimaster FKIE 是功能最完整的多 Master 解决方案:
<!-- multimaster.launch -->
<launch>
<!-- Master 发现节点 -->
<node name="master_discovery" pkg="master_discovery_fkie"
type="master_discovery">
<param name="mcast_group" value="224.0.0.1"/>
<param name="mcast_port" value="11511"/>
<param name="robot_hosts" value="[robot1, robot2, robot3]"/>
</node>
<!-- Master 同步节点 -->
<node name="master_sync" pkg="master_sync_fkie"
type="master_sync">
<!-- 同步规则配置 -->
<rosparam>
sync_topics: ['/sensor_data', '/robot_status']
sync_services: ['/get_plan', '/compute_ik']
ignore_nodes: ['/rosout', '/diagnostic_agg']
</rosparam>
</node>
</launch>
架构设计:
机器人1 机器人2
+----------+ +----------+
| Master 1 |←---发现---→| Master 2 |
+----------+ +----------+
↑ ↑
|同步 |同步
↓ ↓
+----------+ +----------+
| 节点组 1 |←---数据---→| 节点组 2 |
+----------+ +----------+
关键特性:
针对云机器人场景的网关架构:
class ROSGateway:
def __init__(self):
self.local_master = "http://localhost:11311"
self.cloud_endpoint = "wss://cloud.robot.com/ros"
# WebSocket 连接到云端
self.ws = websocket.WebSocketApp(
self.cloud_endpoint,
on_message=self.on_cloud_message,
on_error=self.on_error
)
# 话题过滤器(减少带宽)
self.topic_filters = {
'/camera/image': self.compress_image,
'/laser/scan': self.downsample_scan,
'/tf': self.filter_tf
}
def compress_image(self, msg):
# H.264 编码
return encode_h264(msg)
def downsample_scan(self, msg):
# 降采样激光数据
return msg[::2] # 每隔一个点
def filter_tf(self, msg):
# 只发送关键坐标系
key_frames = ['map', 'odom', 'base_link']
return [tf for tf in msg if tf.child_frame in key_frames]
# 安装 RT-PREEMPT 内核
sudo apt-get install linux-image-rt-amd64
# 配置实时优先级
cat << EOF > /etc/security/limits.d/ros-rt.conf
@ros-rt - rtprio 98
@ros-rt - memlock unlimited
EOF
# 将用户添加到实时组
sudo usermod -a -G ros-rt $USER
实时节点模板:
class RealtimeNode {
private:
struct sched_param param_;
public:
void setupRealtimePriority() {
// 设置 FIFO 调度策略
param_.sched_priority = 80;
if (sched_setscheduler(0, SCHED_FIFO, ¶m_) == -1) {
ROS_ERROR("Failed to set realtime priority");
}
// 锁定内存,防止换页
if (mlockall(MCL_CURRENT | MCL_FUTURE) == -1) {
ROS_ERROR("Failed to lock memory");
}
// 预分配内存
preallocateMemory();
}
void preallocateMemory() {
// 预分配所有可能使用的内存
message_pool_.reserve(1000);
// 触碰每一页,确保物理内存分配
for (auto& msg : message_pool_) {
memset(&msg, 0, sizeof(msg));
}
}
};
1. 时间触发通信:
class TimeTriggeredPublisher {
private:
ros::Timer timer_;
std::atomic<bool> data_ready_{false};
sensor_msgs::JointState latest_msg_;
public:
TimeTriggeredPublisher(ros::NodeHandle& nh) {
// 固定周期发布
timer_ = nh.createTimer(
ros::Duration(0.001), // 1kHz
&TimeTriggeredPublisher::timerCallback,
this);
}
void timerCallback(const ros::TimerEvent& event) {
if (data_ready_) {
pub_.publish(latest_msg_);
// 监控抖动
double jitter = (event.current_real - event.current_expected).toSec();
if (std::abs(jitter) > 0.0001) { // 100us
ROS_WARN("Timer jitter: %.6f", jitter);
}
}
}
};
2. 优先级队列管理:
template<typename T>
class PriorityMessageQueue {
private:
struct PriorityMessage {
int priority;
T message;
ros::Time timestamp;
bool operator<(const PriorityMessage& other) const {
return priority < other.priority;
}
};
std::priority_queue<PriorityMessage> queue_;
std::mutex mutex_;
public:
void push(const T& msg, int priority) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push({priority, msg, ros::Time::now()});
// 限制队列大小,丢弃低优先级旧消息
while (queue_.size() > MAX_QUEUE_SIZE) {
queue_.pop();
}
}
};
关键论文推荐:
开源项目推荐:
本章系统回顾了 ROS1 的核心架构和关键设计决策。让我们总结本章的关键要点:
系统延迟 = 网络延迟 + 序列化时间 + 处理时间
其中:
- 网络延迟 ≈ RTT/2 (局域网 < 1ms)
- 序列化时间 ≈ 消息大小 / CPU频率 × 复杂度因子
- 处理时间 = 应用相关
| 设计决策 | 优势 | 劣势 | ROS2 改进方向 |
|---|---|---|---|
| 中心化 Master | 简单、易理解 | 单点故障 | DDS 分布式发现 |
| XMLRPC 协议 | 跨语言支持好 | 性能开销大 | DDS-RTPS 二进制协议 |
| 进程隔离 | 故障隔离好 | 资源开销大 | 组件化架构 |
| TCPROS 传输 | 可靠传输 | 缺乏 QoS 控制 | DDS QoS 策略 |
| 无安全机制 | 部署简单 | 安全风险 | SROS2 安全框架 |
通过本章的分析,我们可以看到推动 ROS2 诞生的关键因素:
练习 1.1:Master 故障分析 假设一个 ROS1 系统有 10 个节点正在运行,突然 Master 节点崩溃。请分析: a) 已建立的话题通信是否会中断? b) 新节点能否加入系统? c) 参数服务器的数据会发生什么?
练习 1.2:通信模式选择 为以下场景选择最合适的 ROS1 通信机制(话题/服务/动作),并说明理由: a) 激光雷达数据流(40Hz) b) 获取机器人当前位置 c) 机械臂移动到指定位置 d) 紧急停止命令
练习 1.3:Catkin 工作空间问题 你有两个 Catkin 工作空间:ws1 和 ws2。ws1 中有包 A(版本 1.0),ws2 中也有包 A(版本 2.0)。如果按照 ws1、ws2 的顺序 source 两个工作空间的 setup.bash,运行时会使用哪个版本的包 A?
练习 1.4:性能优化方案设计 某机器人系统有一个相机节点发布 1920×1080 的 RGB 图像(30 FPS),三个处理节点订阅这些图像。当前架构导致 CPU 使用率过高,网络带宽接近饱和。请设计一个优化方案,要求:
练习 1.5:分布式系统设计 设计一个多机器人 SLAM 系统,要求: