SceneLoader 详细文档
文件信息
- 路径:
navsim/common/dataloader.py - 作用: 场景数据加载和管理,包括传感器数据和合成场景
- 重要性: ⭐⭐⭐⭐⭐ (数据管道核心)
核心组件
SceneFilter 数据类
@dataclass
class SceneFilter:
num_frames: int # 场景帧数
num_history_frames: int # 历史帧数
frame_interval: int # 帧间隔
has_route: bool # 是否需要路线
max_scenes: Optional[int] # 最大场景数
log_names: Optional[List[str]] # 指定日志名
tokens: Optional[List[str]] # 指定token
synthetic_scene_tokens: Optional[List[str]] # 合成场景token
include_synthetic_scenes: bool # 是否包含合成场景
SceneLoader 类
初始化参数
def __init__(
self,
data_path: Path, # 日志文件夹根目录
original_sensor_path: Path, # 原始传感器数据路径
scene_filter: SceneFilter, # 场景过滤配置
synthetic_sensor_path: Path = None, # 合成传感器数据路径
synthetic_scenes_path: Path = None, # 合成场景路径
sensor_config: SensorConfig = SensorConfig.build_no_sensors()
):
核心属性
scene_frames_dicts: 原始场景帧字典 {token: frame_list}synthetic_scenes: 合成场景字典 {token: [path, log_name]}tokens_stage_one: 第一阶段token集合tokens_stage_two: 第二阶段token集合_sensor_config: 传感器配置_scene_filter: 场景过滤器
核心功能
1. 场景过滤 filter_scenes()
def filter_scenes(
data_path: Path,
scene_filter: SceneFilter
) -> Tuple[Dict[str, FrameList], List[str]]:
过滤流程:
- 日志过滤: 根据log_names筛选日志文件
- 场景分割: 按num_frames和frame_interval分割
- 长度过滤: 排除过短的场景
- 路线过滤: 如需要,排除无路线场景
- Token过滤: 根据指定token筛选
- 数量限制: 达到max_scenes时停止
关键逻辑:
# 场景分割
def split_list(input_list: List, num_frames: int, frame_interval: int):
return [input_list[i:i+num_frames]
for i in range(0, len(input_list), frame_interval)]
# 过滤条件检查
if len(frame_list) < scene_filter.num_frames:
continue # 场景太短
if scene_filter.has_route and len(roadblock_ids) == 0:
continue # 无路线
if filter_tokens and token not in tokens:
continue # token不匹配
2. 合成场景过滤 filter_synthetic_scenes()
def filter_synthetic_scenes(
data_path: Path,
scene_filter: SceneFilter,
stage1_scenes_final_frames_tokens: List[str]
) -> Dict[str, Tuple[Path, str]]:
用途: 加载第二阶段评估的合成场景
过滤条件:
- 属于第一阶段场景的后续场景
- 匹配日志名筛选
- 匹配token筛选(如指定)
3. 数据加载方法
get_scene_from_token(token: str) -> Scene
功能: 根据token加载完整场景
返回: Scene对象,包含所有帧的数据
get_agent_input_from_token(token: str) -> AgentInput
功能: 加载Agent输入数据
包含:
- 当前和历史的自车状态(EgoStatus)
- 传感器数据(相机、LiDAR)
- 静态地图信息
get_scene_indices() -> List[Tuple[str, int]]
功能: 获取所有场景的索引
返回: [(log_name, scene_index), ...]
用途: 并行处理时的任务分配
数据结构
Scene 场景数据
@dataclass
class Scene:
scene_metadata: SceneMetadata # 场景元数据
ego_states: List[EgoStatus] # 自车状态序列
agent_states: List[AgentStates] # 其他智能体状态
cameras: List[Cameras] # 相机数据序列
lidars: List[Lidar] # LiDAR数据序列
traffic_lights: List[TrafficLights] # 交通灯状态
map_api: AbstractMap # 地图API
AgentInput 数据
@dataclass
class AgentInput:
ego_statuses: List[EgoStatus] # 自车状态历史
cameras: List[Cameras] # 相机数据历史
lidars: List[Lidar] # LiDAR数据历史
map_api: AbstractMap # 地图接口
route_roadblock_ids: List[str] # 路线ID
两阶段加载机制
第一阶段(原始场景)
# 加载原始场景
scenes_stage_one = loader.scene_frames_dicts
tokens_stage_one = loader.tokens_stage_one
for token in tokens_stage_one:
agent_input = loader.get_agent_input_from_token(token)
# 评估原始场景...
第二阶段(合成场景)
# 加载合成场景(模拟不同的初始条件)
tokens_stage_two = loader.tokens_stage_two
for token in tokens_stage_two:
scene = loader.get_scene_from_token(token)
# 评估从不同起点开始的场景...
传感器配置
SensorConfig 结构
@dataclass
class SensorConfig:
cameras: Union[bool, List[str]] # 相机配置
lidar_pc: Union[bool, List[int]] # LiDAR配置
@classmethod
def build_all_sensors(cls):
"""加载所有传感器"""
return cls(cameras=True, lidar_pc=True)
@classmethod
def build_no_sensors(cls):
"""不加载传感器"""
return cls(cameras=False, lidar_pc=False)
传感器数据格式
相机数据
Cameras:
- cam_f0: 前置相机
- cam_l0, cam_l1, cam_l2: 左侧相机
- cam_r0, cam_r1, cam_r2: 右侧相机
- cam_b0: 后置相机
每个相机包含:
- image: 图像数组
- intrinsics: 内参矩阵
- extrinsics: 外参矩阵
LiDAR数据
Lidar:
- lidar_pc: 点云数组 [N, 6]
- x, y, z: 3D坐标
- intensity: 强度
- ring: 激光环编号
- lidar_id: LiDAR设备ID
性能优化
1. 懒加载策略
# 仅在需要时加载传感器数据
if self._sensor_config.cameras:
cameras = load_cameras(...)
else:
cameras = None
2. 批量处理
# 一次性加载整个日志文件
scene_dict_list = pickle.load(open(log_pickle_path, "rb"))
# 然后分割成场景
3. 缓存机制
# 缓存已加载的场景
if token in self._scene_cache:
return self._scene_cache[token]
使用示例
基础使用
# 配置场景过滤器
scene_filter = SceneFilter(
num_frames=50, # 5秒场景(10Hz)
num_history_frames=10, # 1秒历史
frame_interval=10, # 每10帧采样一次
has_route=True, # 需要有路线
max_scenes=1000 # 最多1000个场景
)
# 配置传感器
sensor_config = SensorConfig(
cameras=['cam_f0', 'cam_l0', 'cam_r0'], # 只加载前左右相机
lidar_pc=True # 加载LiDAR
)
# 创建加载器
loader = SceneLoader(
data_path=Path('/data/logs'),
original_sensor_path=Path('/data/sensors'),
scene_filter=scene_filter,
sensor_config=sensor_config
)
# 加载数据
for token in loader.tokens:
agent_input = loader.get_agent_input_from_token(token)
# 处理数据...
两阶段评估
# 第一阶段:评估原始场景
for token in loader.tokens_stage_one:
agent_input = loader.get_agent_input_from_token(token)
trajectory = agent.compute_trajectory(agent_input)
score_stage1 = evaluate(trajectory)
# 第二阶段:评估合成场景
for token in loader.tokens_stage_two:
scene = loader.get_scene_from_token(token)
# 从不同起点评估
trajectory = agent.compute_trajectory_from_scene(scene)
score_stage2 = evaluate(trajectory)
# 聚合分数
final_score = aggregate_scores(score_stage1, score_stage2)
错误处理
常见问题
- 文件不存在: 检查路径配置
- 内存溢出: 减少max_scenes或使用批处理
- Token不匹配: 确认token存在于数据集
- 传感器数据缺失: 检查sensor_config配置
调试技巧
# 打印加载信息
print(f"Loaded {len(loader.tokens)} scenes")
print(f"Stage 1: {len(loader.tokens_stage_one)} tokens")
print(f"Stage 2: {len(loader.tokens_stage_two)} tokens")
# 检查单个场景
token = loader.tokens[0]
scene = loader.get_scene_from_token(token)
print(f"Scene frames: {len(scene.ego_states)}")
print(f"Has cameras: {scene.cameras is not None}")
扩展点
- 自定义过滤器: 扩展SceneFilter添加新的过滤条件
- 新传感器类型: 扩展SensorConfig支持新传感器
- 数据增强: 在加载时应用数据增强
- 流式加载: 实现大规模数据的流式处理