SceneLoader 详细文档

文件信息

  • 路径: navsim/common/dataloader.py
  • 作用: 场景数据加载和管理,包括传感器数据和合成场景
  • 重要性: ⭐⭐⭐⭐⭐ (数据管道核心)

核心组件

SceneFilter 数据类

@dataclass
class SceneFilter:
    num_frames: int                    # 场景帧数
    num_history_frames: int            # 历史帧数
    frame_interval: int                # 帧间隔
    has_route: bool                    # 是否需要路线
    max_scenes: Optional[int]          # 最大场景数
    log_names: Optional[List[str]]     # 指定日志名
    tokens: Optional[List[str]]        # 指定token
    synthetic_scene_tokens: Optional[List[str]]  # 合成场景token
    include_synthetic_scenes: bool     # 是否包含合成场景

SceneLoader 类

初始化参数

def __init__(
    self,
    data_path: Path,                   # 日志文件夹根目录
    original_sensor_path: Path,        # 原始传感器数据路径
    scene_filter: SceneFilter,         # 场景过滤配置
    synthetic_sensor_path: Path = None,  # 合成传感器数据路径
    synthetic_scenes_path: Path = None,  # 合成场景路径
    sensor_config: SensorConfig = SensorConfig.build_no_sensors()
):

核心属性

  • scene_frames_dicts: 原始场景帧字典 {token: frame_list}
  • synthetic_scenes: 合成场景字典 {token: [path, log_name]}
  • tokens_stage_one: 第一阶段token集合
  • tokens_stage_two: 第二阶段token集合
  • _sensor_config: 传感器配置
  • _scene_filter: 场景过滤器

核心功能

1. 场景过滤 filter_scenes()

def filter_scenes(
    data_path: Path, 
    scene_filter: SceneFilter
) -> Tuple[Dict[str, FrameList], List[str]]:

过滤流程:

  1. 日志过滤: 根据log_names筛选日志文件
  2. 场景分割: 按num_frames和frame_interval分割
  3. 长度过滤: 排除过短的场景
  4. 路线过滤: 如需要,排除无路线场景
  5. Token过滤: 根据指定token筛选
  6. 数量限制: 达到max_scenes时停止

关键逻辑:

# 场景分割
def split_list(input_list: List, num_frames: int, frame_interval: int):
    return [input_list[i:i+num_frames] 
            for i in range(0, len(input_list), frame_interval)]

# 过滤条件检查
if len(frame_list) < scene_filter.num_frames:
    continue  # 场景太短
if scene_filter.has_route and len(roadblock_ids) == 0:
    continue  # 无路线
if filter_tokens and token not in tokens:
    continue  # token不匹配

2. 合成场景过滤 filter_synthetic_scenes()

def filter_synthetic_scenes(
    data_path: Path,
    scene_filter: SceneFilter,
    stage1_scenes_final_frames_tokens: List[str]
) -> Dict[str, Tuple[Path, str]]:

用途: 加载第二阶段评估的合成场景

过滤条件:

  1. 属于第一阶段场景的后续场景
  2. 匹配日志名筛选
  3. 匹配token筛选(如指定)

3. 数据加载方法

get_scene_from_token(token: str) -> Scene

功能: 根据token加载完整场景
返回: Scene对象包含所有帧的数据

get_agent_input_from_token(token: str) -> AgentInput

功能: 加载Agent输入数据
包含:

- 当前和历史的自车状态EgoStatus
- 传感器数据相机LiDAR
- 静态地图信息

get_scene_indices() -> List[Tuple[str, int]]

功能: 获取所有场景的索引
返回: [(log_name, scene_index), ...]
用途: 并行处理时的任务分配

数据结构

Scene 场景数据

@dataclass
class Scene:
    scene_metadata: SceneMetadata      # 场景元数据
    ego_states: List[EgoStatus]        # 自车状态序列
    agent_states: List[AgentStates]    # 其他智能体状态
    cameras: List[Cameras]             # 相机数据序列
    lidars: List[Lidar]               # LiDAR数据序列
    traffic_lights: List[TrafficLights]  # 交通灯状态
    map_api: AbstractMap               # 地图API

AgentInput 数据

@dataclass
class AgentInput:
    ego_statuses: List[EgoStatus]      # 自车状态历史
    cameras: List[Cameras]             # 相机数据历史
    lidars: List[Lidar]               # LiDAR数据历史
    map_api: AbstractMap               # 地图接口
    route_roadblock_ids: List[str]     # 路线ID

两阶段加载机制

第一阶段(原始场景)

# 加载原始场景
scenes_stage_one = loader.scene_frames_dicts
tokens_stage_one = loader.tokens_stage_one

for token in tokens_stage_one:
    agent_input = loader.get_agent_input_from_token(token)
    # 评估原始场景...

第二阶段(合成场景)

# 加载合成场景(模拟不同的初始条件)
tokens_stage_two = loader.tokens_stage_two

for token in tokens_stage_two:
    scene = loader.get_scene_from_token(token)
    # 评估从不同起点开始的场景...

传感器配置

SensorConfig 结构

@dataclass
class SensorConfig:
    cameras: Union[bool, List[str]]    # 相机配置
    lidar_pc: Union[bool, List[int]]   # LiDAR配置

    @classmethod
    def build_all_sensors(cls):
        """加载所有传感器"""
        return cls(cameras=True, lidar_pc=True)

    @classmethod
    def build_no_sensors(cls):
        """不加载传感器"""
        return cls(cameras=False, lidar_pc=False)

传感器数据格式

相机数据

Cameras:

- cam_f0: 前置相机
- cam_l0, cam_l1, cam_l2: 左侧相机
- cam_r0, cam_r1, cam_r2: 右侧相机
- cam_b0: 后置相机

每个相机包含:

- image: 图像数组
- intrinsics: 内参矩阵
- extrinsics: 外参矩阵

LiDAR数据

Lidar:

- lidar_pc: 点云数组 [N, 6]
  - x, y, z: 3D坐标
  - intensity: 强度
  - ring: 激光环编号
  - lidar_id: LiDAR设备ID

性能优化

1. 懒加载策略

# 仅在需要时加载传感器数据
if self._sensor_config.cameras:
    cameras = load_cameras(...)
else:
    cameras = None

2. 批量处理

# 一次性加载整个日志文件
scene_dict_list = pickle.load(open(log_pickle_path, "rb"))
# 然后分割成场景

3. 缓存机制

# 缓存已加载的场景
if token in self._scene_cache:
    return self._scene_cache[token]

使用示例

基础使用

# 配置场景过滤器
scene_filter = SceneFilter(
    num_frames=50,           # 5秒场景(10Hz)
    num_history_frames=10,   # 1秒历史
    frame_interval=10,       # 每10帧采样一次
    has_route=True,          # 需要有路线
    max_scenes=1000          # 最多1000个场景
)

# 配置传感器
sensor_config = SensorConfig(
    cameras=['cam_f0', 'cam_l0', 'cam_r0'],  # 只加载前左右相机
    lidar_pc=True                             # 加载LiDAR
)

# 创建加载器
loader = SceneLoader(
    data_path=Path('/data/logs'),
    original_sensor_path=Path('/data/sensors'),
    scene_filter=scene_filter,
    sensor_config=sensor_config
)

# 加载数据
for token in loader.tokens:
    agent_input = loader.get_agent_input_from_token(token)
    # 处理数据...

两阶段评估

# 第一阶段:评估原始场景
for token in loader.tokens_stage_one:
    agent_input = loader.get_agent_input_from_token(token)
    trajectory = agent.compute_trajectory(agent_input)
    score_stage1 = evaluate(trajectory)

# 第二阶段:评估合成场景
for token in loader.tokens_stage_two:
    scene = loader.get_scene_from_token(token)
    # 从不同起点评估
    trajectory = agent.compute_trajectory_from_scene(scene)
    score_stage2 = evaluate(trajectory)

# 聚合分数
final_score = aggregate_scores(score_stage1, score_stage2)

错误处理

常见问题

  1. 文件不存在: 检查路径配置
  2. 内存溢出: 减少max_scenes或使用批处理
  3. Token不匹配: 确认token存在于数据集
  4. 传感器数据缺失: 检查sensor_config配置

调试技巧

# 打印加载信息
print(f"Loaded {len(loader.tokens)} scenes")
print(f"Stage 1: {len(loader.tokens_stage_one)} tokens")
print(f"Stage 2: {len(loader.tokens_stage_two)} tokens")

# 检查单个场景
token = loader.tokens[0]
scene = loader.get_scene_from_token(token)
print(f"Scene frames: {len(scene.ego_states)}")
print(f"Has cameras: {scene.cameras is not None}")

扩展点

  1. 自定义过滤器: 扩展SceneFilter添加新的过滤条件
  2. 新传感器类型: 扩展SensorConfig支持新传感器
  3. 数据增强: 在加载时应用数据增强
  4. 流式加载: 实现大规模数据的流式处理