PDMScorer 详细文档

文件信息

  • 路径: navsim/planning/simulation/planner/pdm_planner/scoring/pdm_scorer.py
  • 作用: PDM评分系统的核心实现,计算EPDMS各项指标
  • 重要性: ⭐⭐⭐⭐⭐ (评估系统核心)

类结构

PDMScorerConfig 配置类

@dataclass
class PDMScorerConfig:
    # 加权指标权重
    progress_weight: float = 5.0                    # 进度权重
    ttc_weight: float = 5.0                        # 碰撞时间权重
    lane_keeping_weight: float = 2.0               # 车道保持权重
    history_comfort_weight: float = 2.0            # 历史舒适度权重
    two_frame_extended_comfort_weight: float = 2.0 # 扩展舒适度权重

    # 阈值参数
    driving_direction_horizon: float = 1.0         # 行驶方向评估时间窗[s]
    driving_direction_compliance_threshold: float = 2.0  # 合规阈值[m]
    driving_direction_violation_threshold: float = 6.0   # 违规阈值[m]

    stopped_speed_threshold: float = 5e-03         # 停止速度阈值[m/s]
    future_collision_horizon_window: float = 1.0   # 碰撞预测时间窗[s]
    progress_distance_threshold: float = 5.0       # 进度距离阈值[m]
    lane_keeping_deviation_limit: float = 0.5      # 车道偏离限制[m]
    lane_keeping_horizon_window: float = 2.0       # 车道保持时间窗[s]

    # 人类惩罚过滤
    human_penalty_filter: Optional[bool] = None

PDMScorer 主类

核心属性

评分相关

  • _multi_metrics: 乘法指标数组 [4, num_proposals]
  • _weighted_metrics: 加权指标数组 [5, num_proposals]
  • _progress_raw: 原始进度值

状态相关

  • _states: 提议轨迹的状态数组
  • _ego_coords: 自车坐标
  • _ego_polygons: 自车多边形(用于碰撞检测)
  • _ego_areas: 自车区域标记

环境相关

  • _observation: PDM观测对象
  • _centerline: 中心线路径
  • _route_lane_ids: 路线车道ID列表
  • _drivable_area_map: 可行驶区域地图

时间索引

  • _collision_time_idcs: 碰撞时间索引
  • _ttc_time_idcs: TTC违规时间索引

核心方法

1. score_proposals() - 主评分函数

def score_proposals(
    self,
    states: npt.NDArray[np.float64],           # 提议状态
    observation: PDMObservation,                # 观测数据
    centerline: PDMPath,                        # 中心线
    route_lane_ids: List[str],                  # 路线ID
    drivable_area_map: PDMDrivableMap,         # 可行驶区域
    map_parameters: Optional[MapParameters],    # 地图参数
    simulated_agent_detections_tracks: Optional[List[DetectionsTracks]],
    human_past_trajectory: Optional[InterpolatedTrajectory]
) -> List[pd.DataFrame]:

执行流程:

  1. 初始化和重置内部状态
  2. 计算自车区域
  3. 计算乘法指标(4个)
  4. 计算加权指标(5个)
  5. 聚合PDM分数
  6. 返回评分结果

2. 乘法指标计算

_calculate_no_at_fault_collision() - 无责任碰撞

评分规则:

- 无碰撞: 1.0
- 与车辆碰撞: 0.5
- 与行人/自行车碰撞: 0.0

_calculate_drivable_area_compliance() - 可行驶区域合规

评分规则:

- 完全在可行驶区域内: 1.0
- 任何时刻离开可行驶区域: 0.0

_calculate_traffic_light_compliance() - 交通灯合规

评分规则:

- 遵守所有交通灯: 1.0
- 违反红灯: 0.0

_calculate_driving_direction_compliance() - 行驶方向合规

评分规则:

- 正确方向: 1.0
- 逆向行驶轻微: 0.5
- 严重逆向: 0.0

3. 加权指标计算

_calculate_progress() - 进度评分

计算方式:

- 测量沿中心线的前进距离
- 超过阈值得满分否则按比例
- progress = min(1.0, distance / threshold)

_calculate_ttc() - 碰撞时间评分

计算方式:

- TTC > 阈值: 1.0
- TTC  阈值: 0.0
- 考虑未来1秒内的潜在碰撞

_calculate_lane_keeping() - 车道保持评分

计算方式:

- 横向偏离 < 0.5m: 1.0
- 横向偏离  0.5m: 0.0
- 在交叉口禁用此指标

_calculate_history_comfort() - 历史舒适度评分

评估维度:

- 加速度连续性
- 急动度jerk限制
- 与历史轨迹的一致性

评分聚合

PDM分数计算公式

def _aggregate_pdm_scores(self) -> npt.NDArray[np.float64]:
    # 1. 计算乘法部分(所有乘法指标相乘)
    multi_metric_prod = self._multi_metrics.prod(axis=0)

    # 2. 计算加权部分(加权平均)
    weights = self._config.weighted_metrics_array
    weighted_sum = (self._weighted_metrics * weights[:, None]).sum(axis=0)
    weight_total = weights.sum()
    weighted_avg = weighted_sum / weight_total

    # 3. 最终分数 = 乘法部分 × 加权部分
    pdm_scores = multi_metric_prod * weighted_avg

    return pdm_scores

关键数据结构

状态数组格式

states.shape = [num_proposals, num_frames, 4]
# 维度说明:
# - num_proposals: 候选轨迹数量
# - num_frames: 时间帧数(如40帧 = 4秒×10Hz)
# - 4: [x, y, heading, velocity]

指标索引枚举

MultiMetricIndex(乘法指标)

class MultiMetricIndex(IntEnum):
    NO_COLLISION = 0           # 无碰撞
    DRIVABLE_AREA = 1         # 可行驶区域
    DRIVING_DIRECTION = 2     # 行驶方向
    TRAFFIC_LIGHT_COMPLIANCE = 3  # 交通灯

WeightedMetricIndex(加权指标)

class WeightedMetricIndex(IntEnum):
    PROGRESS = 0              # 进度
    TTC = 1                   # 碰撞时间
    LANE_KEEPING = 2          # 车道保持
    HISTORY_COMFORT = 3       # 历史舒适度
    TWO_FRAME_EXTENDED_COMFORT = 4  # 扩展舒适度

人类惩罚过滤机制

过滤逻辑

if self._config.human_penalty_filter and human_violation:
    # 如果人类驾驶员也违规,则不惩罚Agent
    metric_value = 1.0
else:
    # 正常评分
    metric_value = calculate_metric()

应用场景:

  • 避让静态障碍物时的逆向行驶
  • 紧急情况下的违规操作
  • 确保公平评估

性能优化

1. 批量计算

  • 所有提议同时评分
  • 使用NumPy向量化操作
  • 避免Python循环

2. 缓存策略

  • 重用多边形计算结果
  • 缓存静态地图信息
  • 懒加载观测数据

3. 早期终止

  • 检测到严重违规立即返回0分
  • 跳过不必要的后续计算

评分结果格式

PDMResults 数据类

@dataclass
class PDMResults:
    # 乘法指标
    no_at_fault_collisions: float
    drivable_area_compliance: float
    driving_direction_compliance: float
    traffic_light_compliance: float

    # 加权指标
    ego_progress: float
    time_to_collision_within_bound: float
    lane_keeping: float
    history_comfort: float

    # 聚合分数
    pdm_score: float

    # 额外信息
    progress_raw: float
    collision_time: float
    ttc_time: float

使用示例

# 初始化评分器
scorer = PDMScorer(
    proposal_sampling=TrajectorySampling(num_poses=40, interval_length=0.1),
    config=PDMScorerConfig(
        progress_weight=5.0,
        ttc_weight=5.0,
        lane_keeping_weight=2.0
    )
)

# 评分提议轨迹
results = scorer.score_proposals(
    states=proposal_states,          # [N, 40, 4]
    observation=pdm_observation,     
    centerline=centerline_path,
    route_lane_ids=route_ids,
    drivable_area_map=drivable_map
)

# 获取最佳提议
best_idx = np.argmax([r['pdm_score'] for r in results])
best_trajectory = proposal_states[best_idx]

扩展点

  1. 新评分维度: 添加新的MultiMetricIndex或WeightedMetricIndex
  2. 自定义权重: 通过PDMScorerConfig调整各指标权重
  3. 场景特定评分: 根据场景类型动态调整评分策略
  4. 学习型评分: 从数据中学习最优权重配置

注意事项

  1. 数值稳定性: 避免除零错误和数值溢出
  2. 坐标系一致性: 确保所有计算在同一坐标系
  3. 时间同步: 注意不同数据源的时间戳对齐
  4. 边界情况: 处理极端场景(如停车、掉头等)