在现代机器人系统中,视觉感知是实现自主导航、物体操作和场景理解的关键能力。本章将深入探讨如何在 ROS2 中集成最先进的计算机视觉和深度学习模型,从传统的 2D 目标检测到复杂的 3D 场景理解,再到边缘计算优化。我们将特别关注工程实践中的性能优化、实时性保证和系统集成挑战。
完成本章学习后,您将能够:
YOLO(You Only Look Once)系列是机器人视觉系统中最常用的目标检测框架。从 YOLOv1 到最新的 YOLOv9,每代都带来了显著的性能提升:
YOLOv8 架构特点:
┌─────────────────────────────────────┐
│ Backbone (CSPDarknet) │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │Conv │→ │ C2f │→ │SPPF │ │
│ └─────┘ └─────┘ └─────┘ │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Neck (PAN-FPN) │
│ ┌─────────────────────────┐ │
│ │ Multi-scale Feature │ │
│ │ Aggregation │ │
│ └─────────────────────────┘ │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ Head (Decoupled) │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │ Cls │ │ Reg │ │ DFL │ │
│ └──────┘ └──────┘ └──────┘ │
└─────────────────────────────────────┘
关键性能指标对比:
设计高效的 YOLO ROS2 节点需要考虑以下架构:
# yolo_detector_node.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CompressedImage
from vision_msgs.msg import Detection2DArray, Detection2D
from cv_bridge import CvBridge
import torch
from ultralytics import YOLO
import numpy as np
class YOLODetectorNode(Node):
def __init__(self):
super().__init__('yolo_detector')
# 参数声明
self.declare_parameter('model_path', 'yolov8n.pt')
self.declare_parameter('confidence_threshold', 0.5)
self.declare_parameter('nms_threshold', 0.45)
self.declare_parameter('device', 'cuda:0')
self.declare_parameter('input_size', [640, 640])
self.declare_parameter('enable_tracking', False)
# 初始化模型
model_path = self.get_parameter('model_path').value
device = self.get_parameter('device').value
self.model = YOLO(model_path)
self.model.to(device)
# 预热模型(重要的延迟优化)
dummy_input = torch.zeros((1, 3, 640, 640)).to(device)
_ = self.model(dummy_input)
# 发布器和订阅器
self.detection_pub = self.create_publisher(
Detection2DArray, 'detections', 10)
self.viz_pub = self.create_publisher(
Image, 'detection_viz', 10)
self.image_sub = self.create_subscription(
Image, 'image_raw', self.image_callback, 10)
self.bridge = CvBridge()
def image_callback(self, msg):
# 性能计时
start_time = self.get_clock().now()
# 图像预处理
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
# YOLO 推理
results = self.model(cv_image,
conf=self.get_parameter('confidence_threshold').value,
iou=self.get_parameter('nms_threshold').value)
# 转换为 ROS2 消息
detection_array = self.create_detection_array(results[0], msg.header)
self.detection_pub.publish(detection_array)
# 发布可视化(可选)
if self.viz_pub.get_subscription_count() > 0:
viz_image = results[0].plot()
viz_msg = self.bridge.cv2_to_imgmsg(viz_image, 'bgr8')
viz_msg.header = msg.header
self.viz_pub.publish(viz_msg)
# 性能日志
inference_time = (self.get_clock().now() - start_time).nanoseconds / 1e6
self.get_logger().debug(f'Inference time: {inference_time:.2f}ms')
为实现实时性能,需要多层次优化:
class BatchedYOLONode(Node):
def __init__(self):
super().__init__('batched_yolo')
self.batch_size = 4
self.image_buffer = []
self.header_buffer = []
# 批处理定时器
self.batch_timer = self.create_timer(0.033, self.process_batch) # 30 FPS
def image_callback(self, msg):
self.image_buffer.append(msg)
self.header_buffer.append(msg.header)
if len(self.image_buffer) >= self.batch_size:
self.process_batch()
def process_batch(self):
if not self.image_buffer:
return
# 批量推理
batch_images = [self.bridge.imgmsg_to_cv2(img) for img in self.image_buffer]
results = self.model(batch_images)
# 发布结果
for result, header in zip(results, self.header_buffer):
detection_array = self.create_detection_array(result, header)
self.detection_pub.publish(detection_array)
# 清空缓冲区
self.image_buffer.clear()
self.header_buffer.clear()
class AsyncYOLONode(Node): def init(self): super().init(‘async_yolo’) self.input_queue = Queue(maxsize=10) self.output_queue = Queue(maxsize=10)
# 推理线程
self.inference_thread = threading.Thread(target=self.inference_worker)
self.inference_thread.daemon = True
self.inference_thread.start()
# 发布线程
self.publish_timer = self.create_timer(0.001, self.publish_results)
def inference_worker(self):
while rclpy.ok():
if not self.input_queue.empty():
image, header = self.input_queue.get()
result = self.model(image)
self.output_queue.put((result, header)) ```
集成 DeepSORT 或 ByteTrack 实现稳定跟踪:
from deep_sort_realtime.deepsort_tracker import DeepSort
class YOLOTrackerNode(YOLODetectorNode):
def __init__(self):
super().__init__()
self.tracker = DeepSort(max_age=30, n_init=3)
def process_detections(self, detections, image):
# 准备 DeepSORT 输入
bbs = []
for det in detections:
bbox = [det.x, det.y, det.width, det.height]
confidence = det.confidence
class_id = det.class_id
bbs.append((bbox, confidence, class_id))
# 更新跟踪器
tracks = self.tracker.update_tracks(bbs, frame=image)
# 发布跟踪结果
tracked_detections = Detection2DArray()
for track in tracks:
if not track.is_confirmed():
continue
detection = Detection2D()
detection.id = str(track.track_id)
detection.bbox.center.position.x = track.to_ltrb()[0] + track.to_ltrb()[2] / 2
detection.bbox.center.position.y = track.to_ltrb()[1] + track.to_ltrb()[3] / 2
detection.bbox.size_x = track.to_ltrb()[2] - track.to_ltrb()[0]
detection.bbox.size_y = track.to_ltrb()[3] - track.to_ltrb()[1]
tracked_detections.detections.append(detection)
return tracked_detections
SAM 提供了强大的零样本分割能力:
from segment_anything import sam_model_registry, SamAutomaticMaskGenerator
class SAMSegmentationNode(Node):
def __init__(self):
super().__init__('sam_segmentation')
# 加载 SAM 模型
self.sam = sam_model_registry["vit_h"](checkpoint="sam_vit_h.pth")
self.sam.to('cuda')
# 自动掩码生成器
self.mask_generator = SamAutomaticMaskGenerator(
self.sam,
points_per_side=32,
pred_iou_thresh=0.86,
stability_score_thresh=0.92,
min_mask_region_area=100
)
# ROS2 接口
self.mask_pub = self.create_publisher(
Image, 'segmentation_masks', 10)
def segment_image(self, image):
# 生成掩码
masks = self.mask_generator.generate(image)
# 排序(按面积)
masks = sorted(masks, key=lambda x: x['area'], reverse=True)
# 创建可视化
mask_image = np.zeros_like(image)
for i, mask in enumerate(masks):
color = self.get_color(i)
mask_image[mask['segmentation']] = color
return mask_image, masks
import torchvision.models.segmentation as segmentation
class DeepLabNode(Node):
def __init__(self):
super().__init__('deeplab_segmentation')
# 加载预训练模型
self.model = segmentation.deeplabv3_resnet101(pretrained=True)
self.model.eval()
self.model.cuda()
# 类别映射(Cityscapes)
self.class_names = ['road', 'sidewalk', 'building', 'wall',
'fence', 'pole', 'traffic_light', 'traffic_sign',
'vegetation', 'terrain', 'sky', 'person',
'rider', 'car', 'truck', 'bus', 'train',
'motorcycle', 'bicycle']
@torch.no_grad()
def segment(self, image):
# 预处理
input_tensor = self.preprocess(image)
# 推理
output = self.model(input_tensor)['out'][0]
output_predictions = output.argmax(0)
# 后处理
segmentation_map = output_predictions.byte().cpu().numpy()
return segmentation_map
实例分割提供像素级的目标分割:
import detectron2
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
class MaskRCNNNode(Node):
def __init__(self):
super().__init__('mask_rcnn')
# 配置 Detectron2
cfg = get_cfg()
cfg.merge_from_file("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
cfg.MODEL.WEIGHTS = "detectron2://COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x/137849600/model_final_f10217.pkl"
cfg.MODEL.DEVICE = "cuda"
self.predictor = DefaultPredictor(cfg)
# 发布器
self.instance_pub = self.create_publisher(
InstanceSegmentation, 'instance_masks', 10)
def process_image(self, image):
outputs = self.predictor(image)
# 提取实例
instances = outputs["instances"].to("cpu")
boxes = instances.pred_boxes.tensor.numpy()
scores = instances.scores.numpy()
classes = instances.pred_classes.numpy()
masks = instances.pred_masks.numpy()
# 创建 ROS 消息
instance_msg = InstanceSegmentation()
for i in range(len(instances)):
instance = Instance()
instance.class_id = int(classes[i])
instance.score = float(scores[i])
instance.bbox = self.create_bbox(boxes[i])
instance.mask = self.encode_mask(masks[i])
instance_msg.instances.append(instance)
return instance_msg
将 2D 分割结果投影到 3D 空间:
import open3d as o3d
from sensor_msgs.msg import PointCloud2
import sensor_msgs.point_cloud2 as pc2
class Segmentation3DProjector(Node):
def __init__(self):
super().__init__('segmentation_3d_projector')
# 相机内参
self.K = np.array([[fx, 0, cx],
[0, fy, cy],
[0, 0, 1]])
# 同步订阅
self.seg_sub = message_filters.Subscriber(self, Image, 'segmentation')
self.depth_sub = message_filters.Subscriber(self, Image, 'depth')
self.sync = message_filters.ApproximateTimeSynchronizer(
[self.seg_sub, self.depth_sub], 10, 0.05)
self.sync.registerCallback(self.sync_callback)
# 点云发布器
self.pc_pub = self.create_publisher(PointCloud2, 'segmented_pointcloud', 10)
def sync_callback(self, seg_msg, depth_msg):
# 转换消息
seg_map = self.bridge.imgmsg_to_cv2(seg_msg)
depth_map = self.bridge.imgmsg_to_cv2(depth_msg)
# 生成 3D 点云
points = []
colors = []
labels = []
for v in range(depth_map.shape[0]):
for u in range(depth_map.shape[1]):
z = depth_map[v, u] / 1000.0 # mm to m
if z == 0:
continue
# 反投影到 3D
x = (u - self.K[0, 2]) * z / self.K[0, 0]
y = (v - self.K[1, 2]) * z / self.K[1, 1]
points.append([x, y, z])
labels.append(seg_map[v, u])
colors.append(self.label_to_color(seg_map[v, u]))
# 创建点云消息
header = seg_msg.header
fields = [
pc2.PointField('x', 0, pc2.PointField.FLOAT32, 1),
pc2.PointField('y', 4, pc2.PointField.FLOAT32, 1),
pc2.PointField('z', 8, pc2.PointField.FLOAT32, 1),
pc2.PointField('rgb', 12, pc2.PointField.UINT32, 1),
pc2.PointField('label', 16, pc2.PointField.UINT32, 1)
]
cloud_msg = pc2.create_cloud(header, fields,
self.pack_points(points, colors, labels))
self.pc_pub.publish(cloud_msg)
PointNet++ 是处理点云的经典架构:
import torch
import torch.nn as nn
from pointnet2_ops import pointnet2_utils
class PointNet2MSG(nn.Module):
"""Multi-Scale Grouping PointNet++"""
def __init__(self, num_classes=40):
super().__init__()
# Set Abstraction layers
self.sa1 = PointNetSetAbstractionMSG(
npoint=512,
radii=[0.1, 0.2, 0.4],
nsamples=[16, 32, 128],
in_channel=3,
mlp_list=[[32, 32, 64], [64, 64, 128], [64, 96, 128]]
)
self.sa2 = PointNetSetAbstractionMSG(
npoint=128,
radii=[0.2, 0.4, 0.8],
nsamples=[32, 64, 128],
in_channel=320,
mlp_list=[[64, 64, 128], [128, 128, 256], [128, 128, 256]]
)
# Feature Propagation layers
self.fp2 = PointNetFeaturePropagation(in_channel=640, mlp=[256, 256])
self.fp1 = PointNetFeaturePropagation(in_channel=256, mlp=[256, 128])
# Classifier
self.classifier = nn.Sequential(
nn.Conv1d(128, 128, 1),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.5),
nn.Conv1d(128, num_classes, 1)
)
def forward(self, xyz):
# Set abstraction
l1_xyz, l1_points = self.sa1(xyz, None)
l2_xyz, l2_points = self.sa2(l1_xyz, l1_points)
# Feature propagation
l1_points = self.fp2(l1_xyz, l2_xyz, l1_points, l2_points)
l0_points = self.fp1(xyz, l1_xyz, None, l1_points)
# Classification
x = self.classifier(l0_points)
return x
class PointNet2Node(Node):
def __init__(self):
super().__init__('pointnet2_node')
# 加载模型
self.model = PointNet2MSG(num_classes=10)
self.model.load_state_dict(torch.load('pointnet2_model.pth'))
self.model.cuda()
self.model.eval()
# ROS2 接口
self.pc_sub = self.create_subscription(
PointCloud2, 'points_raw', self.pointcloud_callback, 10)
self.detection_pub = self.create_publisher(
Detection3DArray, 'detections_3d', 10)
def pointcloud_callback(self, msg):
# 转换点云
points = self.pointcloud2_to_tensor(msg)
# 推理
with torch.no_grad():
predictions = self.model(points)
# 发布结果
detections = self.create_3d_detections(predictions, points)
self.detection_pub.publish(detections)
class VoxelNet(nn.Module):
def __init__(self, voxel_size=[0.2, 0.2, 0.4],
point_cloud_range=[0, -40, -3, 70.4, 40, 1]):
super().__init__()
self.voxel_size = voxel_size
self.point_cloud_range = point_cloud_range
# Voxel Feature Encoding
self.vfe = VoxelFeatureExtractor(
num_filters=[32, 128],
with_distance=False,
voxel_size=voxel_size
)
# Middle Convolutional layers
self.middle_conv = MiddleConv(
in_channels=128,
out_channels=64,
layer_nums=3,
layer_strides=[2, 1, 1],
filter_sizes=[3, 3, 3]
)
# Region Proposal Network
self.rpn = RPN(
in_channels=64,
num_anchors_per_location=2,
box_code_size=7,
num_class=1
)
def forward(self, voxels, num_points, coors):
# VFE
voxel_features = self.vfe(voxels, num_points, coors)
# Sparse to dense
spatial_features = self.sparse_to_dense(voxel_features, coors)
# Middle layers
spatial_features = self.middle_conv(spatial_features)
# RPN
box_preds, cls_preds = self.rpn(spatial_features)
return box_preds, cls_preds
PointPillars 提供了优秀的速度-精度权衡:
class PointPillars(nn.Module):
def __init__(self, num_classes=3):
super().__init__()
# Pillar Feature Network
self.pfn = PillarFeatureNet(
num_filters=[64],
use_norm=True,
with_distance=False,
voxel_size=(0.16, 0.16, 4),
pc_range=(0, -39.68, -3, 69.12, 39.68, 1)
)
# Backbone: 2D CNN
self.backbone = nn.Sequential(
nn.Conv2d(64, 64, 3, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.Conv2d(128, 256, 3, stride=2, padding=1),
nn.BatchNorm2d(256),
nn.ReLU()
)
# Detection Head
self.head = SSDHead(
in_channels=256,
num_classes=num_classes,
num_anchors_per_location=2
)
def forward(self, pillars, indices):
# Pillar encoding
pillar_features = self.pfn(pillars, indices)
# Scatter to BEV
bev_features = self.scatter_to_bev(pillar_features, indices)
# Backbone
spatial_features = self.backbone(bev_features)
# Detection
cls_preds, box_preds, dir_cls_preds = self.head(spatial_features)
return cls_preds, box_preds, dir_cls_preds
多模态融合提升检测精度:
class PointCloudImageFusion(nn.Module):
def __init__(self):
super().__init__()
# 点云分支
self.point_branch = PointPillars(num_classes=3)
# 图像分支
self.image_branch = torchvision.models.resnet50(pretrained=True)
self.image_branch.fc = nn.Identity()
# 融合模块
self.fusion = nn.Sequential(
nn.Linear(2048 + 256, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.ReLU()
)
# 检测头
self.detection_head = nn.Linear(256, 7 * 3) # 7 params per box, 3 classes
def forward(self, points, images, calib):
# 点云特征
point_features = self.point_branch(points)
# 图像特征
image_features = self.image_branch(images)
# 投影对齐
aligned_features = self.project_and_align(
point_features, image_features, calib)
# 融合
fused_features = self.fusion(aligned_features)
# 检测
detections = self.detection_head(fused_features)
return detections
class FusionNode(Node):
def __init__(self):
super().__init__('fusion_detection')
# 同步订阅
self.pc_sub = message_filters.Subscriber(
self, PointCloud2, 'velodyne_points')
self.img_sub = message_filters.Subscriber(
self, Image, 'camera/image_raw')
self.sync = message_filters.ApproximateTimeSynchronizer(
[self.pc_sub, self.img_sub], 10, 0.05)
self.sync.registerCallback(self.fusion_callback)
# 加载融合模型
self.model = PointCloudImageFusion()
self.model.load_state_dict(torch.load('fusion_model.pth'))
self.model.cuda()
self.model.eval()
TensorRT 提供多种优化技术:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
class TensorRTOptimizer:
def __init__(self, onnx_model_path, precision='fp16'):
self.logger = trt.Logger(trt.Logger.WARNING)
self.builder = trt.Builder(self.logger)
self.config = self.builder.create_builder_config()
# 设置精度
if precision == 'fp16':
self.config.set_flag(trt.BuilderFlag.FP16)
elif precision == 'int8':
self.config.set_flag(trt.BuilderFlag.INT8)
self.config.int8_calibrator = self.create_calibrator()
# 设置内存限制
self.config.max_workspace_size = 1 << 30 # 1GB
# 动态批处理
self.config.set_flag(trt.BuilderFlag.STRICT_TYPES)
def build_engine(self, onnx_path):
# 解析 ONNX
network = self.builder.create_network(
1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
parser = trt.OnnxParser(network, self.logger)
with open(onnx_path, 'rb') as f:
if not parser.parse(f.read()):
for error in range(parser.num_errors):
self.get_logger().error(parser.get_error(error))
return None
# 优化配置
profile = self.builder.create_optimization_profile()
# 动态输入尺寸
input_tensor = network.get_input(0)
profile.set_shape(input_tensor.name,
min=(1, 3, 224, 224),
opt=(4, 3, 640, 640),
max=(8, 3, 1280, 1280))
self.config.add_optimization_profile(profile)
# 构建引擎
engine = self.builder.build_engine(network, self.config)
return engine
class INT8Calibrator(trt.IInt8EntropyCalibrator2):
def __init__(self, calibration_data, cache_file):
super().__init__()
self.calibration_data = calibration_data
self.cache_file = cache_file
self.batch_size = 32
self.current_index = 0
# 分配 GPU 内存
self.device_input = cuda.mem_alloc(
self.batch_size * 3 * 640 * 640 * 4) # float32
def get_batch_size(self):
return self.batch_size
def get_batch(self, names):
if self.current_index >= len(self.calibration_data):
return None
# 准备批次数据
batch = self.calibration_data[
self.current_index:self.current_index + self.batch_size]
self.current_index += self.batch_size
# 复制到 GPU
cuda.memcpy_htod(self.device_input, batch)
return [self.device_input]
def read_calibration_cache(self):
if os.path.exists(self.cache_file):
with open(self.cache_file, 'rb') as f:
return f.read()
return None
def write_calibration_cache(self, cache):
with open(self.cache_file, 'wb') as f:
f.write(cache)
class DynamicBatchProcessor:
def __init__(self, engine_path, max_batch_size=8):
self.max_batch_size = max_batch_size
self.engine = self.load_engine(engine_path)
self.context = self.engine.create_execution_context()
# 创建缓冲区
self.buffers = self.allocate_buffers()
# 批处理队列
self.batch_queue = []
self.result_futures = []
def allocate_buffers(self):
buffers = []
for binding in self.engine:
shape = self.engine.get_binding_shape(binding)
size = trt.volume(shape) * self.max_batch_size
dtype = trt.nptype(self.engine.get_binding_dtype(binding))
# 分配 GPU 内存
device_mem = cuda.mem_alloc(size * dtype.itemsize)
buffers.append(device_mem)
return buffers
def infer_batch(self, batch_data):
batch_size = len(batch_data)
# 设置动态批大小
self.context.set_binding_shape(0, (batch_size, 3, 640, 640))
# 复制输入数据
cuda.memcpy_htod(self.buffers[0], batch_data)
# 执行推理
self.context.execute_v2(self.buffers)
# 复制输出数据
output_shape = self.context.get_binding_shape(1)
output = np.empty(output_shape, dtype=np.float32)
cuda.memcpy_dtoh(output, self.buffers[1])
return output
def process_async(self, image):
future = Future()
self.batch_queue.append((image, future))
if len(self.batch_queue) >= self.max_batch_size:
self._process_batch()
return future
def _process_batch(self):
if not self.batch_queue:
return
# 提取批次
batch_data = [item[0] for item in self.batch_queue]
futures = [item[1] for item in self.batch_queue]
# 推理
results = self.infer_batch(np.stack(batch_data))
# 分发结果
for i, future in enumerate(futures):
future.set_result(results[i])
# 清空队列
self.batch_queue.clear()
针对 NVIDIA Jetson 的优化:
class JetsonDeployment:
def __init__(self, model_path):
# Jetson 电源模式
self.set_power_mode('MAXN') # 最大性能
# 设置 GPU/DLA
self.device = 'DLA0' if self.has_dla() else 'GPU'
# TensorRT 引擎
self.engine = self.build_jetson_engine(model_path)
def set_power_mode(self, mode):
"""设置 Jetson 电源模式"""
modes = {
'MAXN': 0, # 最大性能
'10W': 1, # 10W 功耗
'15W': 2, # 15W 功耗
'30W': 3 # 30W 功耗
}
os.system(f'sudo nvpmodel -m {modes[mode]}')
os.system('sudo jetson_clocks') # 锁定最高频率
def has_dla(self):
"""检查是否有 DLA (Deep Learning Accelerator)"""
return os.path.exists('/dev/nvhost-nvdla0')
def build_jetson_engine(self, model_path):
builder = trt.Builder(self.logger)
config = builder.create_builder_config()
# Jetson 特定优化
if self.device == 'DLA':
config.default_device_type = trt.DeviceType.DLA
config.DLA_core = 0
# 使用 FP16(Jetson 上更高效)
config.set_flag(trt.BuilderFlag.FP16)
# 内存优化(Jetson 内存有限)
config.max_workspace_size = 1 << 28 # 256MB
# 使用统一内存
config.set_flag(trt.BuilderFlag.GPU_FALLBACK)
return self.build_engine(model_path, config)
def benchmark(self, input_shape=(1, 3, 640, 640), iterations=100):
"""性能基准测试"""
import time
# 预热
dummy_input = np.random.randn(*input_shape).astype(np.float32)
for _ in range(10):
_ = self.infer(dummy_input)
# 测试
start = time.perf_counter()
for _ in range(iterations):
_ = self.infer(dummy_input)
end = time.perf_counter()
# 统计
avg_time = (end - start) / iterations * 1000 # ms
fps = 1000 / avg_time
self.get_logger().info(f"Average inference time: {avg_time:.2f}ms")
self.get_logger().info(f"FPS: {fps:.1f}")
# 功耗监控
power = self.get_power_consumption()
self.get_logger().info(f"Power consumption: {power:.1f}W")
return {
'avg_latency_ms': avg_time,
'fps': fps,
'power_w': power
}
Tesla 的全自动驾驶(FSD)系统代表了纯视觉自动驾驶的极限挑战。与大多数自动驾驶公司不同,Tesla 完全移除了激光雷达和毫米波雷达,仅依靠 8 个摄像头实现 360° 感知。
Tesla FSD 的视觉系统采用了革命性的架构设计:
FSD 视觉架构:
┌─────────────────────────────────────────────┐
│ Multi-Camera Input (8 cameras) │
│ Front×3 Side×2 Rear×1 Pillar×2 │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ RegNet Backbone (shared) │
│ Efficient feature extraction │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ BiFPN Feature Pyramid │
│ Multi-scale feature aggregation │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ Transformer-based BEV Encoder │
│ Project features to bird's eye view │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ Temporal Fusion (4D) │
│ Aggregate across time frames │
└─────────────────────────────────────────────┘
↓
┌──────────────┬────────────┬────────────────┐
│ Detection │ Tracking │ Planning │
│ Head │ Head │ Head │
└──────────────┴────────────┴────────────────┘
class TeslaFSDOptimizations:
"""Tesla FSD 风格的优化技术"""
def __init__(self):
# 1. 缓存优化
self.feature_cache = TemporalCache(max_frames=10)
# 2. 稀疏卷积
self.sparse_conv = SparseConvolution()
# 3. 混合精度
self.mixed_precision = True
def cached_backbone_forward(self, images, timestamps):
"""缓存骨干网络特征,避免重复计算"""
features = []
for img, ts in zip(images, timestamps):
# 检查缓存
if ts in self.feature_cache:
feat = self.feature_cache[ts]
else:
# 计算新特征
with autocast(enabled=self.mixed_precision):
feat = self.backbone(img)
self.feature_cache[ts] = feat
features.append(feat)
return features
def vector_space_output(self, bev_features):
"""矢量空间输出,避免密集栅格"""
# 车道线:用多项式参数表示
lane_params = self.lane_head(bev_features) # [N, 4] (a,b,c,d for ax³+bx²+cx+d)
# 目标:用边界框角点表示
object_corners = self.object_head(bev_features) # [N, 8] (4 corners × 2)
# 可行驶区域:用多边形顶点表示
drivable_polygon = self.drivable_head(bev_features) # [M, 2]
return {
'lanes': lane_params,
'objects': object_corners,
'drivable': drivable_polygon
}
def temporal_nms(self, detections, history):
"""时序 NMS,利用历史信息提高稳定性"""
# 将当前检测与历史关联
matched = self.associate(detections, history)
# 平滑边界框
for det, hist in matched:
det.bbox = 0.7 * det.bbox + 0.3 * hist.bbox
det.confidence = 0.8 * det.confidence + 0.2 * hist.confidence
return detections
BEV 感知已成为自动驾驶的主流范式:
class BEVFormer(nn.Module):
"""BEVFormer: 基于 Transformer 的 BEV 感知"""
def __init__(self, num_cameras=6, bev_h=200, bev_w=200):
super().__init__()
# 图像编码器
self.image_encoder = ResNet50()
# BEV 查询
self.bev_queries = nn.Parameter(
torch.randn(bev_h * bev_w, 256))
# Spatial Cross-Attention
self.spatial_cross_attn = SpatialCrossAttention(
embed_dim=256,
num_heads=8,
num_levels=4,
num_cameras=num_cameras
)
# Temporal Self-Attention
self.temporal_self_attn = TemporalSelfAttention(
embed_dim=256,
num_heads=8,
num_frames=4
)
# BEV 编码器
self.bev_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(d_model=256, nhead=8),
num_layers=6
)
def forward(self, images, intrinsics, extrinsics, prev_bev=None):
B, N, C, H, W = images.shape # B:batch, N:cameras
# 1. 提取图像特征
image_features = []
for i in range(N):
feat = self.image_encoder(images[:, i])
image_features.append(feat)
image_features = torch.stack(image_features, dim=1)
# 2. 初始化 BEV 查询
bev_queries = self.bev_queries.unsqueeze(0).repeat(B, 1, 1)
# 3. Spatial Cross-Attention
# 将 BEV 查询投影到图像特征空间
bev_features = self.spatial_cross_attn(
bev_queries,
image_features,
intrinsics,
extrinsics
)
# 4. Temporal Self-Attention
if prev_bev is not None:
bev_features = self.temporal_self_attn(
bev_features, prev_bev)
# 5. BEV 编码
bev_features = self.bev_encoder(bev_features)
return bev_features.reshape(B, self.bev_h, self.bev_w, -1)
class DETR3D(nn.Module):
"""DETR3D: 端到端 3D 目标检测"""
def __init__(self, num_queries=900, num_classes=10):
super().__init__()
# 对象查询
self.object_queries = nn.Parameter(
torch.randn(num_queries, 256))
# 3D 参考点
self.reference_points = nn.Linear(256, 3)
# Transformer decoder
self.decoder = nn.TransformerDecoder(
nn.TransformerDecoderLayer(
d_model=256,
nhead=8,
dim_feedforward=2048
),
num_layers=6
)
# 预测头
self.class_head = nn.Linear(256, num_classes)
self.bbox_head = nn.Linear(256, 10) # (x,y,z,w,l,h,rot,vx,vy,vz)
def forward(self, multi_view_features, intrinsics, extrinsics):
B = multi_view_features.shape[0]
# 1. 初始化查询和参考点
queries = self.object_queries.unsqueeze(0).repeat(B, 1, 1)
reference_points = self.reference_points(queries).sigmoid()
reference_points[..., :2] = reference_points[..., :2] * 2 - 1 # [-1, 1]
reference_points[..., 2] = reference_points[..., 2] * 6 - 3 # [-3, 3]m
# 2. 投影参考点到各个视图
projected_points = self.project_3d_to_2d(
reference_points, intrinsics, extrinsics)
# 3. 采样图像特征
sampled_features = self.sample_features(
multi_view_features, projected_points)
# 4. Transformer 解码
decoded_features = self.decoder(
queries, sampled_features)
# 5. 预测
class_logits = self.class_head(decoded_features)
bbox_preds = self.bbox_head(decoded_features)
return class_logits, bbox_preds
class FlashBEVAttention(nn.Module): “"”使用 Flash Attention 加速 BEV 计算”””
def forward(self, q, k, v):
# Flash Attention: O(N) 内存复杂度
return flash_attn_func(q, k, v, dropout_p=0.1, causal=False) ```
class DeformableBEVAttention(nn.Module):
"""可变形注意力,减少计算量"""
def __init__(self, dim=256, n_heads=8, n_points=4):
super().__init__()
self.n_points = n_points
self.sampling_offsets = nn.Linear(dim, n_heads * n_points * 2)
self.attention_weights = nn.Linear(dim, n_heads * n_points)
def forward(self, query, reference_points, value):
# 预测采样偏移
sampling_offsets = self.sampling_offsets(query)
sampling_offsets = sampling_offsets.view(
-1, self.n_heads, self.n_points, 2)
# 计算采样位置
sampling_locations = reference_points[:, :, None, :] + sampling_offsets
# 采样和加权
sampled_value = F.grid_sample(value, sampling_locations)
attention_weights = self.attention_weights(query).softmax(-1)
output = (sampled_value * attention_weights).sum(dim=2)
return output
本章深入探讨了 ROS2 中计算机视觉与深度学习的集成方法。我们学习了:
| SAM 的零样本分割能力:$P(mask | image, prompt) = \sigma(f_{\theta}(image, prompt))$ |
关键公式总结:
相机投影模型:$\begin{bmatrix} u \ v \ 1 \end{bmatrix} = \frac{1}{Z} K \begin{bmatrix} X \ Y \ Z \end{bmatrix}$
| IoU 计算:$IoU = \frac{ | A \cap B | }{ | A \cup B | }$ |
NMS 阈值:$score_i = score_i \cdot (1 - IoU(box_i, box_{max}))^{p}$,其中 $p$ 为软化参数
torch.no_grad(),及时 del 大张量,监控 nvidia-smi