ros2_tutorial

第 1 章:ROS1 核心概念回顾

开篇段落

ROS1(Robot Operating System)作为机器人领域最成功的中间件框架,从 2007 年诞生至今已经成为学术界和工业界的事实标准。尽管 ROS2 带来了诸多架构改进,但理解 ROS1 的核心设计理念对于掌握 ROS2 至关重要。本章将系统回顾 ROS1 的核心架构,重点分析其设计决策背后的权衡,为后续章节理解 ROS2 的改进动机奠定基础。

学习目标

Master 节点与分布式架构

ROS Master 的角色定位

ROS1 采用了中心化的 Master 节点设计,这是整个系统的神经中枢。Master 节点本质上是一个轻量级的名称服务器(Name Server),提供以下核心功能:

  1. 名称注册与解析:维护节点名称到网络地址的映射表
  2. 服务发现:帮助节点之间建立点对点连接
  3. 参数服务器:存储和分发全局配置参数
     +----------------+
     |   ROS Master   |
     |   (roscore)    |
     +-------+--------+
             |
     +-------+--------+
     |  Name Service  |
     |   Registry     |
     +----------------+
            / | \
           /  |  \
    +-----+   |   +-----+
    |Node1|   |   |Node2|
    +-----+   |   +-----+
              |
          +-------+
          |Node3  |
          +-------+

XMLRPC 协议与通信流程

ROS1 使用 XMLRPC 作为节点与 Master 之间的通信协议。这个选择反映了 2007 年的技术栈现状:XMLRPC 简单、跨语言支持好,但也带来了性能开销。

节点启动与注册流程

  1. 节点启动时,通过 ROS_MASTER_URI 环境变量找到 Master
  2. 使用 XMLRPC 调用 registerNode() 方法注册自己
  3. Master 返回注册确认,节点获得唯一 ID
  4. 节点注册自己提供的话题/服务到 Master

话题订阅建立流程

发布者节点                Master                订阅者节点
    |                      |                      |
    |--registerPublisher-->|                      |
    |                      |<--registerSubscriber-|
    |                      |                      |
    |                      |--publisherUpdate---->|
    |<-----------------requestTopic---------------|
    |------------------TCPROS连接---------------->|

这个过程的关键点:

分布式系统设计考量

ROS1 的分布式架构设计有几个重要特征:

1. 松耦合通信 节点之间通过话题进行松耦合通信,发布者和订阅者互不知晓对方存在。这种设计带来了极大的灵活性,但也引入了一些挑战:

2. 点对点数据传输 数据不经过 Master 直接在节点间传输,这个设计决策影响深远:

带宽利用率 = 数据量 / (数据量 + 协议开销)

对于 ROS1:
- 小消息(<1KB):带宽利用率约 60-70%
- 大消息(>100KB):带宽利用率可达 95%+

3. 网络透明性 ROS1 的网络透明性设计让分布式部署变得简单,但也带来了安全隐患:

多机通信配置

在多机环境下部署 ROS1 需要careful配置:

# 机器 A (Master 所在)
export ROS_MASTER_URI=http://192.168.1.100:11311
export ROS_IP=192.168.1.100

# 机器 B (Worker 节点)
export ROS_MASTER_URI=http://192.168.1.100:11311
export ROS_IP=192.168.1.101
export ROS_HOSTNAME=worker-robot  # 可选,用于 DNS 解析

网络配置检查清单

  1. 所有机器时钟同步(NTP)
  2. 防火墙开放必要端口(11311 for Master, 随机端口 for nodes)
  3. 主机名解析正确(/etc/hosts 或 DNS)
  4. 网络延迟 < 10ms(局域网环境)

话题、服务、动作通信机制

话题(Topics):发布-订阅模式

话题是 ROS1 中最基础的通信机制,实现了经典的发布-订阅模式。

消息传输特征

TCPROS 协议细节

[4字节长度][消息序列化数据]
           |
           +-- 使用 ROS 消息序列化格式
               (类似 Protocol Buffers 但更简单)

性能特征分析

队列管理策略

# 发布者队列大小设置
pub = rospy.Publisher('topic', MessageType, queue_size=10)
# queue_size 影响:
# - 太小:高频发布时可能丢失消息
# - 太大:占用内存,增加延迟

服务(Services):请求-响应模式

服务提供同步的请求-响应通信模式,适合需要确定性结果的场景。

服务调用流程

客户端                    服务器
  |                         |
  |---请求(Request)------>|
  |                         |处理请求
  |<---响应(Response)------|
  |                         |

关键设计决策

  1. 同步阻塞:客户端等待服务器响应
  2. 单次连接:每次调用建立新的 TCP 连接
  3. 无状态:服务器不维护客户端状态

性能考量

服务调用开销 = 连接建立时间 + 请求传输 + 处理时间 + 响应传输

典型场景:
- 小请求(<1KB):总开销 5-10ms
- 大请求(>10KB):主要受网络带宽限制

持久连接优化

# 使用持久连接减少开销
from rospy import ServiceProxy
service = ServiceProxy('service_name', ServiceType, persistent=True)
# 重用 TCP 连接,减少握手开销

动作(Actions):带反馈的异步任务

动作是 ROS1 中最复杂的通信机制,适合长时间运行的任务。

动作协议的五个组成部分

  1. Goal:任务目标
  2. Result:最终结果
  3. Feedback:执行过程中的反馈
  4. Status:任务状态(pending/active/succeeded/aborted)
  5. Cancel:取消机制
动作内部实现 = 5个话题 + 状态机管理
           /action_name/goal        (目标发送)
           /action_name/cancel      (取消请求)
           /action_name/status      (状态更新)
           /action_name/feedback    (进度反馈)
           /action_name/result      (最终结果)

状态机转换图

        [PENDING]
            |
            v
        [ACTIVE] <---> [PREEMPTING]
         /    \              |
        v      v             v
   [SUCCEEDED] [ABORTED] [PREEMPTED]

设计模式应用场景

Catkin 构建系统

Catkin 的设计理念

Catkin 是 ROS1 的构建系统,基于 CMake 扩展而来,解决了大规模机器人软件的构建挑战。

核心设计目标

  1. 包管理:支持细粒度的功能包组织
  2. 依赖管理:自动处理包之间的依赖关系
  3. 并行构建:充分利用多核 CPU
  4. 跨平台:支持 Linux、macOS(部分)

工作空间结构

catkin_ws/
├── src/               # 源代码目录
│   ├── package1/
│   │   ├── CMakeLists.txt
│   │   ├── package.xml
│   │   ├── src/
│   │   └── include/
│   └── package2/
├── build/             # 构建中间文件
│   └── [CMake 生成的构建文件]
├── devel/             # 开发空间
│   ├── setup.bash     # 环境配置脚本
│   ├── lib/           # 编译的库文件
│   └── share/         # 资源文件
└── install/           # 安装空间(可选)
    └── [发布版本文件]

CMakeLists.txt 深度解析

cmake_minimum_required(VERSION 3.0.2)
project(my_robot_package)

# 查找 catkin 和依赖包
find_package(catkin REQUIRED COMPONENTS
  roscpp
  std_msgs
  sensor_msgs
  geometry_msgs
)

# 声明 catkin 包
catkin_package(
  INCLUDE_DIRS include
  LIBRARIES ${PROJECT_NAME}
  CATKIN_DEPENDS roscpp std_msgs
  DEPENDS eigen3  # 系统依赖
)

# 包含目录
include_directories(
  include
  ${catkin_INCLUDE_DIRS}
)

# 编译库
add_library(${PROJECT_NAME}
  src/algorithm.cpp
)

# 编译可执行文件
add_executable(robot_node src/main.cpp)
target_link_libraries(robot_node
  ${PROJECT_NAME}
  ${catkin_LIBRARIES}
)

# 安装规则
install(TARGETS robot_node
  RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)

包依赖管理

package.xml 结构

<?xml version="1.0"?>
<package format="2">
  <name>my_robot_package</name>
  <version>1.0.0</version>
  <description>机器人控制包</description>
  
  <maintainer email="dev@robot.com">Developer</maintainer>
  <license>MIT</license>
  
  <!-- 构建依赖 -->
  <buildtool_depend>catkin</buildtool_depend>
  <build_depend>roscpp</build_depend>
  
  <!-- 运行依赖 -->
  <exec_depend>roscpp</exec_depend>
  <exec_depend>rospy</exec_depend>
  
  <!-- 测试依赖 -->
  <test_depend>rostest</test_depend>
</package>

依赖解析算法

  1. 拓扑排序确定构建顺序
  2. 检测循环依赖
  3. 并行构建无依赖关系的包

构建优化技巧

1. 并行构建加速

# 使用所有 CPU 核心
catkin_make -j$(nproc)

# 或使用 catkin_tools(推荐)
catkin build --jobs $(nproc)

2. 增量构建优化

# 只构建修改的包
catkin build --this

# 构建指定包及其依赖
catkin build package_name --deps

3. ccache 加速重复编译

# 安装 ccache
sudo apt-get install ccache

# 配置 catkin 使用 ccache
export CC="ccache gcc"
export CXX="ccache g++"

参数服务器与动态配置

参数服务器架构

ROS1 的参数服务器是一个中心化的配置存储系统,运行在 Master 节点上。它使用层次化的命名空间存储键值对。

参数类型支持

命名空间层次结构

/
├── robot_name              # 全局参数
├── /navigation/
│   ├── max_velocity        # 导航模块参数
│   ├── planner/
│   │   ├── algorithm       # 规划器配置
│   │   └── resolution
│   └── controller/
│       └── gains           # 控制器参数
└── /perception/
    ├── camera/
    │   └── fps
    └── lidar/
        └── range

参数操作 API

参数读写操作

# Python API
import rospy

# 读取参数
max_vel = rospy.get_param('/navigation/max_velocity', 1.0)  # 带默认值
params = rospy.get_param('/navigation/')  # 获取整个命名空间

# 写入参数
rospy.set_param('/navigation/max_velocity', 2.0)

# 删除参数
rospy.delete_param('/navigation/obsolete_param')

# 检查参数存在
if rospy.has_param('/navigation/max_velocity'):
    # 参数存在
    pass

# C++ API
ros::NodeHandle nh;
double max_vel;
nh.getParam("/navigation/max_velocity", max_vel);
nh.setParam("/navigation/max_velocity", 2.0);

私有参数与相对命名

# 私有参数(节点命名空间)
rospy.init_node('my_node')
# 参数实际路径:/my_node/param_name
private_param = rospy.get_param('~param_name')

# 相对参数(当前命名空间)
# 如果当前命名空间是 /robot1/
relative_param = rospy.get_param('sensor/range')
# 实际路径:/robot1/sensor/range

动态重配置(Dynamic Reconfigure)

动态重配置是 ROS1 的一个强大特性,允许运行时修改参数而无需重启节点。

配置文件定义(.cfg)

#!/usr/bin/env python
from dynamic_reconfigure.parameter_generator_catkin import *

gen = ParameterGenerator()

# 添加参数:名称、类型、级别、描述、默认值、最小值、最大值
gen.add("max_velocity", double_t, 0, 
        "Maximum velocity", 1.0, 0.0, 5.0)
gen.add("enable_obstacle_avoidance", bool_t, 0,
        "Enable obstacle avoidance", True)

# 枚举类型
algorithm_enum = gen.enum([
    gen.const("DWA", int_t, 0, "Dynamic Window Approach"),
    gen.const("TEB", int_t, 1, "Timed Elastic Band"),
    gen.const("MPC", int_t, 2, "Model Predictive Control")
], "Planning algorithm selection")

gen.add("algorithm", int_t, 0, 
        "Path planning algorithm", 0, 0, 2, 
        edit_method=algorithm_enum)

exit(gen.generate("my_package", "my_node", "MyConfig"))

节点实现动态重配置

#include <dynamic_reconfigure/server.h>
#include <my_package/MyConfigConfig.h>

class MyNode {
private:
    dynamic_reconfigure::Server<my_package::MyConfigConfig> server_;
    
    void configCallback(my_package::MyConfigConfig &config, uint32_t level) {
        // 更新内部参数
        max_velocity_ = config.max_velocity;
        use_obstacle_avoidance_ = config.enable_obstacle_avoidance;
        
        // 级别检查(哪些参数改变了)
        if (level & 0x1) {
            // 速度参数改变
            updateVelocityController();
        }
    }
    
public:
    MyNode() {
        // 设置回调
        server_.setCallback(
            boost::bind(&MyNode::configCallback, this, _1, _2));
    }
};

参数服务器性能分析

性能特征

参数读取延迟 = 网络往返时间 + XMLRPC 解析
             ≈ 1-5ms(局域网)

批量操作优化:
- 单个参数读取:N 次网络往返
- 命名空间读取:1 次网络往返
- 推荐:启动时批量读取,缓存在本地

缓存策略

class ParameterCache:
    def __init__(self, namespace):
        # 启动时批量读取
        self.params = rospy.get_param(namespace)
        self.namespace = namespace
        
    def get(self, key, default=None):
        # 从本地缓存读取
        return self.params.get(key, default)
    
    def refresh(self):
        # 定期刷新缓存
        self.params = rospy.get_param(self.namespace)

产业案例研究:Willow Garage PR2 机器人系统架构

PR2 系统概述

PR2(Personal Robot 2)是 Willow Garage 开发的双臂移动服务机器人,是 ROS1 发展史上的里程碑项目。其系统架构充分展示了 ROS1 在复杂机器人系统中的应用。

硬件规格

软件架构设计

节点拓扑结构

                    PR2 ROS 节点架构
    +------------------------------------------------+
    |                  高层任务规划                    |
    |    task_executive    move_base    manipulation  |
    +------------------------------------------------+
                            |
    +------------------------------------------------+
    |                   中间件服务                     |
    |  tf  robot_state  diagnostics  power_management |
    +------------------------------------------------+
                            |
    +------------------------------------------------+
    |                  硬件抽象层                       |
    |   pr2_controller_manager    pr2_ethercat        |
    +------------------------------------------------+
                            |
    +------------------------------------------------+
    |                  驱动程序层                       |
    | motor_drivers  sensor_drivers  camera_drivers   |
    +------------------------------------------------+

关键设计决策

  1. 实时控制回路分离
    • 1kHz 电机控制回路运行在实时内核
    • 100Hz 运动规划运行在普通用户空间
    • 通过共享内存传递控制命令
  2. 传感器数据流水线
    激光雷达 (40Hz) ─┐
                  ├─> 传感器融合 ─> 八叉树地图 ─> 导航规划
    立体相机 (30Hz) ─┘                    |
                                      v
    Kinect (30Hz) ────> 点云处理 ─> 物体识别 ─> 抓取规划
    
  3. 分布式计算架构
    • 主计算机:高层规划、传感器融合
    • 从计算机:图像处理、点云处理
    • 实时控制器:电机控制、安全监控

通信模式选择策略

PR2 在不同场景下选择不同的 ROS1 通信机制:

话题使用场景

服务使用场景

动作使用场景

性能优化实践

1. 消息传输优化

// 使用 nodelet 减少数据拷贝
class ImageProcessingNodelet : public nodelet::Nodelet {
    // 同进程内使用指针传递,避免序列化
    void imageCallback(const sensor_msgs::ImageConstPtr& msg) {
        // 零拷贝处理
    }
};

2. 话题分流策略

# 高频数据使用独立话题
/base_scan          # 40Hz 激光数据
/base_scan_filtered # 10Hz 滤波数据(导航使用)
/base_scan_marking  # 5Hz 障碍标记(建图使用)

3. 参数服务器优化

# 启动时批量加载参数
rosparam:
  - file: config/navigation.yaml
    ns: /move_base
  - file: config/manipulation.yaml  
    ns: /arm_controller

故障处理与恢复

PR2 实现了多层次的故障检测与恢复机制:

1. 硬件层安全机制

2. 软件层监控

# 诊断聚合器配置
analyzers:
  motors:
    type: diagnostic_aggregator/GenericAnalyzer
    path: Motors
    contains: ['motor_']
    timeout: 5.0
    
  sensors:
    type: diagnostic_aggregator/GenericAnalyzer
    path: Sensors
    contains: ['laser', 'camera', 'imu']
    timeout: 2.0

3. 系统级恢复策略

经验教训总结

PR2 项目为 ROS2 的设计提供了宝贵经验:

  1. Master 单点故障:PR2 在实际部署中多次遇到 Master 崩溃导致全系统失效
  2. 实时性不足:TCPROS 的不确定延迟影响控制性能
  3. 安全性缺失:缺乏认证机制,任何节点都可以控制机器人
  4. 资源开销大:每个节点都是独立进程,内存和 CPU 开销显著

高级话题:ROS1 分布式系统优化与多 Master 方案

分布式系统性能优化

网络拓扑优化

在大规模机器人系统中,网络拓扑设计直接影响系统性能:

星型 vs 网状拓扑

星型拓扑(中心化):           网状拓扑(分布式):
     Master                   Node1 ←→ Node2
    /   |   \                   ↑  ×  ↓
Node1 Node2 Node3             Node3 ←→ Node4

延迟:O(1)跳                 延迟:O(log n)跳
带宽:受中心限制              带宽:多路径均衡
容错:单点故障                容错:多路径冗余

优化策略

  1. 话题路由优化:根据数据流量模式调整网络拓扑
  2. 局部性原理:相关节点部署在同一子网
  3. 带宽预留:为关键数据流预留网络带宽

消息传输优化技术

1. 消息批处理(Message Batching)

class BatchedPublisher {
private:
    std::vector<sensor_msgs::PointCloud2> batch_;
    ros::Timer batch_timer_;
    
    void batchTimerCallback(const ros::TimerEvent&) {
        if (!batch_.empty()) {
            // 打包发送
            custom_msgs::PointCloudBatch msg;
            msg.clouds = batch_;
            batch_pub_.publish(msg);
            batch_.clear();
        }
    }
    
public:
    void addToBatch(const sensor_msgs::PointCloud2& cloud) {
        batch_.push_back(cloud);
        if (batch_.size() >= BATCH_SIZE) {
            // 立即发送
            batchTimerCallback(ros::TimerEvent());
        }
    }
};

2. 压缩传输

import rospy
from sensor_msgs.msg import CompressedImage
import cv2

# 发布压缩图像
def publish_compressed(image):
    # JPEG 压缩
    _, compressed = cv2.imencode('.jpg', image, 
                                 [cv2.IMWRITE_JPEG_QUALITY, 80])
    msg = CompressedImage()
    msg.data = compressed.tostring()
    msg.format = "jpeg"
    compressed_pub.publish(msg)

3. 共享内存传输(同机优化)

// 使用 nodelet 实现零拷贝
namespace my_package {
class ProcessingNodelet : public nodelet::Nodelet {
    void onInit() override {
        // 订阅和发布使用共享指针
        sub_ = nh_.subscribe<sensor_msgs::PointCloud2>(
            "input", 1, 
            boost::bind(&ProcessingNodelet::callback, this, _1));
    }
    
    void callback(const sensor_msgs::PointCloud2ConstPtr& msg) {
        // 直接操作指针,无需拷贝
        processCloud(msg);
    }
};
}

多 Master 架构方案

方案一:Foreign Relay

Foreign Relay 是最简单的多 Master 连接方案:

# foreign_relay.py
import rospy
from std_msgs.msg import String

class ForeignRelay:
    def __init__(self, foreign_master_uri, local_topic, foreign_topic):
        # 连接到外部 Master
        self.foreign_master = xmlrpclib.ServerProxy(foreign_master_uri)
        
        # 本地发布者
        self.local_pub = rospy.Publisher(local_topic, String, queue_size=10)
        
        # 定期拉取外部话题
        self.timer = rospy.Timer(rospy.Duration(0.1), self.relay_callback)
        
    def relay_callback(self, event):
        # 从外部 Master 获取数据
        data = self.fetch_foreign_topic()
        if data:
            self.local_pub.publish(data)

优缺点分析

方案二:Multimaster FKIE

Multimaster FKIE 是功能最完整的多 Master 解决方案:

<!-- multimaster.launch -->
<launch>
  <!-- Master 发现节点 -->
  <node name="master_discovery" pkg="master_discovery_fkie" 
        type="master_discovery">
    <param name="mcast_group" value="224.0.0.1"/>
    <param name="mcast_port" value="11511"/>
    <param name="robot_hosts" value="[robot1, robot2, robot3]"/>
  </node>
  
  <!-- Master 同步节点 -->
  <node name="master_sync" pkg="master_sync_fkie" 
        type="master_sync">
    <!-- 同步规则配置 -->
    <rosparam>
      sync_topics: ['/sensor_data', '/robot_status']
      sync_services: ['/get_plan', '/compute_ik']
      ignore_nodes: ['/rosout', '/diagnostic_agg']
    </rosparam>
  </node>
</launch>

架构设计

  机器人1                    机器人2
+----------+              +----------+
| Master 1 |←---发现---→| Master 2 |
+----------+              +----------+
     ↑                         ↑
     |同步                     |同步
     ↓                         ↓
+----------+              +----------+
| 节点组 1  |←---数据---→| 节点组 2  |
+----------+              +----------+

关键特性

  1. 自动发现:使用组播 UDP 自动发现其他 Master
  2. 选择性同步:可配置同步规则
  3. 冲突解决:处理命名冲突和时钟同步

方案三:ROS1 Gateway(面向云机器人)

针对云机器人场景的网关架构:

class ROSGateway:
    def __init__(self):
        self.local_master = "http://localhost:11311"
        self.cloud_endpoint = "wss://cloud.robot.com/ros"
        
        # WebSocket 连接到云端
        self.ws = websocket.WebSocketApp(
            self.cloud_endpoint,
            on_message=self.on_cloud_message,
            on_error=self.on_error
        )
        
        # 话题过滤器(减少带宽)
        self.topic_filters = {
            '/camera/image': self.compress_image,
            '/laser/scan': self.downsample_scan,
            '/tf': self.filter_tf
        }
    
    def compress_image(self, msg):
        # H.264 编码
        return encode_h264(msg)
    
    def downsample_scan(self, msg):
        # 降采样激光数据
        return msg[::2]  # 每隔一个点
    
    def filter_tf(self, msg):
        # 只发送关键坐标系
        key_frames = ['map', 'odom', 'base_link']
        return [tf for tf in msg if tf.child_frame in key_frames]

实时性增强技术

RT-PREEMPT 内核集成

# 安装 RT-PREEMPT 内核
sudo apt-get install linux-image-rt-amd64

# 配置实时优先级
cat << EOF > /etc/security/limits.d/ros-rt.conf
@ros-rt - rtprio 98
@ros-rt - memlock unlimited
EOF

# 将用户添加到实时组
sudo usermod -a -G ros-rt $USER

实时节点模板

class RealtimeNode {
private:
    struct sched_param param_;
    
public:
    void setupRealtimePriority() {
        // 设置 FIFO 调度策略
        param_.sched_priority = 80;
        if (sched_setscheduler(0, SCHED_FIFO, &param_) == -1) {
            ROS_ERROR("Failed to set realtime priority");
        }
        
        // 锁定内存,防止换页
        if (mlockall(MCL_CURRENT | MCL_FUTURE) == -1) {
            ROS_ERROR("Failed to lock memory");
        }
        
        // 预分配内存
        preallocateMemory();
    }
    
    void preallocateMemory() {
        // 预分配所有可能使用的内存
        message_pool_.reserve(1000);
        // 触碰每一页,确保物理内存分配
        for (auto& msg : message_pool_) {
            memset(&msg, 0, sizeof(msg));
        }
    }
};

确定性通信保证

1. 时间触发通信

class TimeTriggeredPublisher {
private:
    ros::Timer timer_;
    std::atomic<bool> data_ready_{false};
    sensor_msgs::JointState latest_msg_;
    
public:
    TimeTriggeredPublisher(ros::NodeHandle& nh) {
        // 固定周期发布
        timer_ = nh.createTimer(
            ros::Duration(0.001),  // 1kHz
            &TimeTriggeredPublisher::timerCallback, 
            this);
    }
    
    void timerCallback(const ros::TimerEvent& event) {
        if (data_ready_) {
            pub_.publish(latest_msg_);
            
            // 监控抖动
            double jitter = (event.current_real - event.current_expected).toSec();
            if (std::abs(jitter) > 0.0001) {  // 100us
                ROS_WARN("Timer jitter: %.6f", jitter);
            }
        }
    }
};

2. 优先级队列管理

template<typename T>
class PriorityMessageQueue {
private:
    struct PriorityMessage {
        int priority;
        T message;
        ros::Time timestamp;
        
        bool operator<(const PriorityMessage& other) const {
            return priority < other.priority;
        }
    };
    
    std::priority_queue<PriorityMessage> queue_;
    std::mutex mutex_;
    
public:
    void push(const T& msg, int priority) {
        std::lock_guard<std::mutex> lock(mutex_);
        queue_.push({priority, msg, ros::Time::now()});
        
        // 限制队列大小,丢弃低优先级旧消息
        while (queue_.size() > MAX_QUEUE_SIZE) {
            queue_.pop();
        }
    }
};

论文导读

关键论文推荐

  1. “ROS: an open-source Robot Operating System” (Quigley et al., 2009)
    • ROS 原始设计理念
    • 分布式架构决策依据
    • 早期应用案例
  2. “Performance Evaluation of ROS-Based Systems” (Maruyama et al., 2016)
    • ROS1 性能基准测试
    • 瓶颈分析方法
    • 优化建议
  3. “Real-Time ROS Extensions” (Wei et al., 2016)
    • RT-PREEMPT 集成经验
    • 实时性保证机制
    • 工业应用案例

开源项目推荐

本章小结

本章系统回顾了 ROS1 的核心架构和关键设计决策。让我们总结本章的关键要点:

核心概念总结

  1. Master-Slave 架构
    • Master 作为名称服务器,负责节点发现和连接建立
    • 节点间直接通信,Master 不参与数据传输
    • 简化了系统设计,但引入了单点故障风险
  2. 三种通信模式
    • 话题:异步发布-订阅,适合数据流传输
    • 服务:同步请求-响应,适合命令执行
    • 动作:带反馈的异步任务,适合长时间操作
  3. 关键性能公式
    系统延迟 = 网络延迟 + 序列化时间 + 处理时间
       
    其中:
    - 网络延迟 ≈ RTT/2 (局域网 < 1ms)
    - 序列化时间 ≈ 消息大小 / CPU频率 × 复杂度因子
    - 处理时间 = 应用相关
    
  4. Catkin 构建系统
    • 基于 CMake 的包管理系统
    • 支持并行构建和依赖管理
    • 工作空间隔离开发环境
  5. 参数服务器
    • 中心化配置存储
    • 支持动态重配置
    • 层次化命名空间组织

设计权衡分析

设计决策 优势 劣势 ROS2 改进方向
中心化 Master 简单、易理解 单点故障 DDS 分布式发现
XMLRPC 协议 跨语言支持好 性能开销大 DDS-RTPS 二进制协议
进程隔离 故障隔离好 资源开销大 组件化架构
TCPROS 传输 可靠传输 缺乏 QoS 控制 DDS QoS 策略
无安全机制 部署简单 安全风险 SROS2 安全框架

从 ROS1 到 ROS2 的演进动力

通过本章的分析,我们可以看到推动 ROS2 诞生的关键因素:

  1. 可靠性需求:消除 Master 单点故障
  2. 实时性需求:确定性通信和调度
  3. 安全性需求:认证、加密、访问控制
  4. 嵌入式支持:降低资源占用
  5. 产业化需求:生产环境的稳定性和可维护性

练习题

基础题

练习 1.1:Master 故障分析 假设一个 ROS1 系统有 10 个节点正在运行,突然 Master 节点崩溃。请分析: a) 已建立的话题通信是否会中断? b) 新节点能否加入系统? c) 参数服务器的数据会发生什么?

💡 提示 考虑 Master 在连接建立前后的不同作用。
📝 参考答案 a) **已建立的话题通信不会立即中断**。因为节点之间已经建立了直接的 TCPROS 连接,数据传输不经过 Master。但是,如果任何一个节点重启或网络中断后,无法重新建立连接。 b) **新节点无法加入系统**。新节点启动时需要向 Master 注册,如果 Master 不可用,注册会失败,节点无法发现其他节点。 c) **参数服务器数据完全丢失**。参数服务器运行在 Master 进程中,Master 崩溃意味着所有参数数据丢失。除非节点已经缓存了参数,否则无法获取配置信息。 这个问题揭示了 ROS1 的核心架构缺陷,也是 ROS2 采用 DDS 的主要原因之一。

练习 1.2:通信模式选择 为以下场景选择最合适的 ROS1 通信机制(话题/服务/动作),并说明理由: a) 激光雷达数据流(40Hz) b) 获取机器人当前位置 c) 机械臂移动到指定位置 d) 紧急停止命令

💡 提示 考虑数据频率、是否需要响应、执行时间长短。
📝 参考答案 a) **话题**:高频数据流,不需要确认,多个节点可能需要订阅。 b) **服务**:一次性查询,需要立即响应,典型的请求-响应模式。 c) **动作**:长时间执行任务,需要反馈进度,可能需要取消操作。 d) **话题**(特殊情况):虽然是命令,但紧急停止需要广播给所有相关节点,且不能等待响应。使用话题确保最快传递,可设置队列大小为1,使用可靠传输。

练习 1.3:Catkin 工作空间问题 你有两个 Catkin 工作空间:ws1 和 ws2。ws1 中有包 A(版本 1.0),ws2 中也有包 A(版本 2.0)。如果按照 ws1、ws2 的顺序 source 两个工作空间的 setup.bash,运行时会使用哪个版本的包 A?

💡 提示 考虑 ROS_PACKAGE_PATH 的覆盖机制。
📝 参考答案 会使用 **ws2 中的包 A(版本 2.0)**。 原因:后 source 的工作空间会覆盖先前的设置。当 source ws2/devel/setup.bash 时,ws2 的路径会被添加到 ROS_PACKAGE_PATH 的前面,因此 ROS 会优先找到 ws2 中的包。 可以通过以下命令验证: ```bash echo $ROS_PACKAGE_PATH rospack find A # 会显示 ws2 中的路径 ``` 这种机制允许开发者通过工作空间覆盖系统包,方便开发和测试。

挑战题

练习 1.4:性能优化方案设计 某机器人系统有一个相机节点发布 1920×1080 的 RGB 图像(30 FPS),三个处理节点订阅这些图像。当前架构导致 CPU 使用率过高,网络带宽接近饱和。请设计一个优化方案,要求:

💡 提示 考虑 nodelet、图像压缩、感兴趣区域(ROI)等技术。
📝 参考答案 **综合优化方案**: 1. **使用 Nodelet 架构**(减少 CPU 30%): - 将相机节点和处理节点改写为 nodelet - 在同一进程中运行,使用指针传递,避免 3 次图像拷贝 - 节省序列化/反序列化开销 2. **图像压缩传输**(减少带宽 60%): - 对于需要网络传输的部分,使用 JPEG 压缩 - 压缩质量设为 85,视觉损失最小 - 原始:1920×1080×3×30 = 186 MB/s - 压缩后:约 75 MB/s 3. **智能处理策略**(减少 CPU 20%,带宽 10%): - 实现图像金字塔,低分辨率预处理 - 只对感兴趣区域进行全分辨率处理 - 使用时间相关性,只处理变化区域 4. **硬件加速**(额外优化): - 使用 OpenCV 的 CUDA 支持 - 图像预处理使用 GPU 实施代码框架: ```cpp class OptimizedImagePipeline : public nodelet::Nodelet { void onInit() { // 共享内存发布 image_pub_ = nh_.advertise<sensor_msgs::Image>("image", 1); compressed_pub_ = nh_.advertise<sensor_msgs::CompressedImage>("image/compressed", 1); } void imageCallback(const sensor_msgs::ImageConstPtr& msg) { // 本地 nodelet 直接传递指针 image_pub_.publish(msg); // 远程节点发送压缩版本 if (compressed_pub_.getNumSubscribers() > 0) { publishCompressed(msg); } } }; ```

练习 1.5:分布式系统设计 设计一个多机器人 SLAM 系统,要求:

💡 提示 考虑多 Master 方案、地图融合策略、冲突解决机制。
📝 参考答案 **分布式 SLAM 系统架构**: 1. **多 Master 配置**: ```yaml # 使用 multimaster_fkie robot1: master_discovery: robot_hosts: [robot1, robot2, robot3] heartbeat_hz: 0.5 master_sync: sync_topics: ['/map_exchange', '/loop_closure'] sync_services: ['/merge_maps'] ``` 2. **地图数据结构**: ```cpp struct DistributedMap { std::string robot_id; ros::Time timestamp; nav_msgs::OccupancyGrid local_map; geometry_msgs::Pose origin_in_global; std::vector loop_closures; }; ``` 3. **地图融合策略**: - 每个机器人维护局部地图 - 定期(1Hz)交换地图增量 - 使用图优化融合全局地图 - 冲突区域使用概率融合 4. **网络分区处理**: ```python class PartitionHandler: def __init__(self): self.last_seen = {} self.local_buffer = [] def handle_partition(self): # 检测分区 for robot in self.robot_list: if time.now() - self.last_seen[robot] > timeout: self.enter_degraded_mode(robot) # 缓存本地更新 self.buffer_local_updates() # 分区恢复后同步 if self.partition_healed(): self.sync_buffered_data() ``` 5. **一致性保证**: - 使用逻辑时钟(Lamport 时间戳) - 向量时钟追踪因果关系 - 最终一致性模型 这个设计确保了系统的鲁棒性和可扩展性,即使在网络不稳定的情况下也能继续工作。 </details> **练习 1.6:实时控制系统设计** 设计一个 1kHz 机械臂控制回路,要求: - 最大延迟 < 1ms - 抖动 < 100μs - 与 ROS1 导航栈集成 - 支持力控制模式
💡 提示 考虑实时内核、共享内存、优先级反转等问题。
📝 参考答案 **实时控制系统设计**: 1. **系统架构分层**: ``` 用户空间 (ROS1) 实时空间 (RT) ┌─────────────┐ ┌──────────────┐ │ Navigation │ │ RT Controller│ │ Planner │<------->│ 1 kHz │ └─────────────┘ 共享内存 └──────────────┘ ↓ ┌──────────────┐ │ EtherCAT Bus │ │ <100 μs │ └──────────────┘ ``` 2. **实时控制器实现**: ```cpp class RTArmController { private: // 实时安全的数据结构 struct RTControlData { std::atomic target_pos[7]; std::atomic target_vel[7]; std::atomic force_limit[7]; std::atomic timestamp; }; RTControlData* shared_mem_; public: void rtControlLoop() { // 设置 CPU 亲和性 cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(3, &cpuset); // 独占 CPU3 pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset); // 实时调度 struct sched_param param = {.sched_priority = 99}; sched_setscheduler(0, SCHED_FIFO, &param); // 控制循环 while (running_) { auto start = rtClock::now(); // 1. 读取传感器(EtherCAT) readSensors(); // <50μs // 2. 控制计算 computeControl(); // <200μs // 3. 发送命令 sendCommands(); // <50μs // 4. 监控时序 auto elapsed = rtClock::now() - start; if (elapsed > 900μs) { rtLog::warn("Cycle overrun: {} μs", elapsed); } // 精确睡眠到下一周期 rtClock::sleepUntil(start + 1ms); } } }; ``` 3. **ROS1 集成层**: ```cpp class ROSRTBridge { void trajectoryCallback(const trajectory_msgs::JointTrajectory& msg) { // 轨迹插值到 1kHz auto interpolated = interpolateTrajectory(msg, 1000); // 写入共享内存(无锁) for (size_t i = 0; i < interpolated.size(); ++i) { shared_mem_->target_pos[i].store(interpolated[i].position); shared_mem_->target_vel[i].store(interpolated[i].velocity); } shared_mem_->timestamp.store(rtClock::now()); } }; ``` 4. **力控制实现**: ```cpp void computeForceControl() { // 读取力/力矩传感器 Vector6d wrench = readFTSensor(); // 导纳控制 Vector6d delta_x = admittance_matrix_ * (wrench - target_wrench_); // 笛卡尔到关节空间 VectorXd delta_q = jacobian_.inverse() * delta_x; // 应用控制 for (int i = 0; i < 7; ++i) { joint_cmd_[i] = joint_pos_[i] + delta_q[i]; } } ``` 这个设计通过分离实时和非实时部分,使用共享内存通信,避免了 ROS1 的不确定性,同时保持了与 ROS 生态的兼容性。 </details> ## 常见陷阱与错误(Gotchas) ### 1. 网络配置错误 **问题**:多机通信时节点无法互相发现 ```bash # 错误配置 export ROS_MASTER_URI=http://192.168.1.100:11311 export ROS_HOSTNAME=localhost # 错误!其他机器无法解析 ``` **解决方案**: ```bash # 正确配置 export ROS_MASTER_URI=http://192.168.1.100:11311 export ROS_IP=192.168.1.101 # 使用实际 IP # 或 export ROS_HOSTNAME=robot1 # 确保所有机器的 /etc/hosts 中有此条目 ``` ### 2. 话题名称不匹配 **问题**:发布者和订阅者话题不匹配,但难以发现 ```cpp // 节点 A ros::Publisher pub = nh.advertise<std_msgs::String>("/robot/status", 10); // 节点 B ros::Subscriber sub = nh.subscribe("/robot_status", 10, callback); // 拼写错误! ``` **调试技巧**: ```bash # 检查话题连接 rostopic info /robot/status rosnode info /node_name rqt_graph # 可视化节点连接 ``` ### 3. 消息类型版本不一致 **问题**:自定义消息修改后,忘记重新编译所有依赖包 ``` # Package A 定义消息 # Package B 使用旧版本消息 # 运行时出现序列化错误 ``` **预防措施**: ```bash # 清理并重新构建所有包 catkin clean catkin build # 或使用依赖追踪 catkin build --force-cmake ``` ### 4. 回调队列阻塞 **问题**:在回调函数中执行耗时操作 ```cpp void imageCallback(const sensor_msgs::Image& msg) { // 错误:阻塞回调队列 cv::Mat result = expensiveImageProcessing(msg); // 耗时 500ms processed_pub.publish(result); } ``` **正确方式**: ```cpp void imageCallback(const sensor_msgs::Image& msg) { // 将数据推入处理队列 image_queue_.push(msg); cv_.notify_one(); // 通知处理线程 } void processingThread() { while (ros::ok()) { sensor_msgs::Image img; if (image_queue_.pop(img)) { // 在独立线程处理 auto result = expensiveImageProcessing(img); processed_pub.publish(result); } } } ``` ### 5. 参数服务器竞态条件 **问题**:多个节点同时修改参数 ```python # 节点 A current = rospy.get_param('/counter') rospy.set_param('/counter', current + 1) # 非原子操作! # 节点 B 同时执行相同代码 # 结果:计数器可能只增加 1 而不是 2 ``` **解决方案**: 使用分布式锁或改用服务实现原子操作。 ### 6. tf 时间戳问题 **问题**:使用错误的时间戳查询 tf ```cpp // 错误:使用当前时间查询历史变换 tf::StampedTransform transform; listener.lookupTransform("map", "base_link", ros::Time::now(), transform); // 可能抛出异常:extrapolation into the future ``` **正确方式**: ```cpp // 使用 Time(0) 获取最新可用变换 listener.lookupTransform("map", "base_link", ros::Time(0), transform); // 或等待变换可用 listener.waitForTransform("map", "base_link", stamp, ros::Duration(0.1)); ``` ## 最佳实践检查清单 ### 系统设计审查 - [ ] **单点故障分析** - 识别所有单点故障(Master、关键节点) - 设计故障恢复机制 - 实施健康检查和自动重启 - [ ] **性能需求评估** - 明确延迟和带宽要求 - 选择合适的通信模式 - 考虑是否需要实时性保证 - [ ] **扩展性设计** - 节点功能单一职责 - 使用命名空间组织话题 - 预留配置和接口扩展点 ### 开发实践 - [ ] **消息设计** - 优先使用标准消息类型 - 自定义消息保持向后兼容 - 避免过度嵌套的消息结构 - [ ] **节点实现** - 实现优雅关闭(SIGINT 处理) - 添加诊断信息发布 - 使用 ROS 日志系统 - [ ] **参数管理** - 使用 YAML 文件组织参数 - 实施参数验证 - 支持动态重配置(如适用) ### 测试策略 - [ ] **单元测试** - 使用 rostest 框架 - 模拟外部依赖 - 测试异常情况 - [ ] **集成测试** - 测试节点间通信 - 验证时序要求 - 测试网络故障恢复 - [ ] **性能测试** - 测量消息延迟 - 监控 CPU 和内存使用 - 压力测试(高频率、大消息) ### 部署准备 - [ ] **文档完善** - README 包含依赖和构建说明 - 记录所有话题/服务/参数 - 提供 launch 文件示例 - [ ] **配置管理** - 环境相关配置外部化 - 使用 roslaunch 参数覆盖 - 版本控制配置文件 - [ ] **监控部署** - 配置诊断聚合器 - 设置日志轮转 - 实施性能监控 ### 安全考虑 - [ ] **网络安全** - 限制 Master 访问(防火墙) - 使用 VPN 跨网络通信 - 验证输入数据合法性 - [ ] **故障安全** - 实施紧急停止机制 - 添加传感器数据合理性检查 - 设计降级运行模式 通过遵循这个检查清单,可以构建更加健壮、可维护的 ROS1 系统,同时为将来迁移到 ROS2 打下良好基础。