第8章:BFS与DFS的选择
在图和树的遍历中,BFS(广度优先搜索)和DFS(深度优先搜索)是两种最基本的搜索策略。它们不仅是算法的工具,更代表了两种不同的问题解决视角。BFS像水波纹一样逐层扩散,保证找到最短路径;DFS像探险家一样深入探索,直到无路可走才回头。理解这两种搜索的本质差异,以及何时选择哪种方法,是解决图类问题的关键。
本章将深入探讨BFS和DFS的核心思想、实现技巧、优化方法,以及它们在不同场景下的选择原则。我们会特别关注如何利用你的函数式编程背景,用递归和不可变性的思维来理解这些算法。同时,我们也会介绍双向BFS和记忆化搜索这两种重要的优化技术。
8.1 BFS:最短路径与层序遍历
核心思想:逐层扩展
BFS的本质是"先访问距离起点近的节点,再访问距离远的节点"。这种逐层扩展的特性使得BFS天然适合解决最短路径问题。从代数角度看,BFS实际上是在构建一个距离函数 $d: V \to \mathbb{N}$,其中 $d(v)$ 表示从起点到节点 $v$ 的最短距离。
BFS的核心数据结构是队列,它保证了访问顺序的正确性。每次从队列头部取出一个节点,将其未访问的邻居加入队列尾部,这样就确保了按距离递增的顺序访问所有节点。
起点 -> 距离1的节点 -> 距离2的节点 -> ... -> 距离k的节点
^ ^
| |
队列头部 队列尾部
标准BFS模板
def bfs(start, target):
queue = collections.deque([start])
visited = {start}
level = 0
while queue:
# 处理当前层的所有节点
for _ in range(len(queue)):
node = queue.popleft()
if node == target:
return level
# 将下一层节点加入队列
for neighbor in get_neighbors(node):
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
level += 1
return -1 # 未找到目标
这个模板的关键点:
- visited集合的更新时机:在节点入队时就标记为已访问,而不是出队时,避免重复入队
- 层级遍历:通过
for _ in range(len(queue))确保每次处理完整的一层 - 状态表示:节点可以是简单的数字,也可以是复杂的元组
(x, y, state)
最短路径的数学保证
BFS能够找到最短路径的原因可以用归纳法证明:
定理:在无权图中,BFS第一次访问节点 $v$ 时,找到的就是从起点 $s$ 到 $v$ 的最短路径。
证明:
- 基础情况:$d(s) = 0$,显然成立
- 归纳假设:对于所有 $d(u) < k$ 的节点 $u$,BFS正确计算了最短距离
- 归纳步骤:考虑 $d(v) = k$ 的节点 $v$,存在某个 $d(u) = k-1$ 的节点 $u$ 使得 $(u,v) \in E$
- 根据归纳假设,$u$ 在第 $k-1$ 层被正确访问
- $v$ 将在第 $k$ 层被访问(如果还未被访问)
- 不可能在更早的层访问 $v$,否则 $d(v) < k$,矛盾
例题1:二叉树的锯齿形层序遍历 (#103)
这道题要求层序遍历,但奇数层从左到右,偶数层从右到左。关键是理解这只是输出顺序的变化,遍历逻辑不变。
def zigzagLevelOrder(root):
if not root:
return []
result = []
queue = collections.deque([root])
left_to_right = True
while queue:
level_size = len(queue)
level_values = []
for _ in range(level_size):
node = queue.popleft()
level_values.append(node.val)
if node.left:
queue.append(node.left)
if node.right:
queue.append(node.right)
# 根据方向决定是否反转
if not left_to_right:
level_values.reverse()
result.append(level_values)
left_to_right = not left_to_right
return result
注意这里我们不改变BFS的遍历顺序,只是在输出时根据层号决定是否反转。这体现了"分离关注点"的设计原则。
例题2:单词接龙 (#127)
给定两个单词和一个字典,找出从起始词到结束词的最短转换序列长度,每次只能改变一个字母。
这是典型的最短路径问题,每个单词是一个节点,如果两个单词只差一个字母则有边相连。
def ladderLength(beginWord, endWord, wordList):
if endWord not in wordList:
return 0
# 预处理:构建通配符字典
# 例如:hit -> *it, h*t, hi*
word_dict = collections.defaultdict(list)
for word in wordList:
for i in range(len(word)):
pattern = word[:i] + '*' + word[i+1:]
word_dict[pattern].append(word)
queue = collections.deque([(beginWord, 1)])
visited = {beginWord}
while queue:
word, level = queue.popleft()
# 生成所有可能的通配符模式
for i in range(len(word)):
pattern = word[:i] + '*' + word[i+1:]
# 找到所有匹配的单词
for neighbor in word_dict[pattern]:
if neighbor == endWord:
return level + 1
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, level + 1))
return 0
这个解法的巧妙之处在于通配符模式的使用,避免了 $O(n^2 \cdot m)$ 的暴力比较($n$ 是单词数,$m$ 是单词长度)。
状态空间的BFS
有时候节点不是简单的值,而是复杂的状态。比如在迷宫问题中,状态可能包括位置和已收集的钥匙。
# 状态:(x, y, keys_collected)
def shortestPathAllKeys(grid):
m, n = len(grid), len(grid[0])
all_keys = 0
start = None
# 找到起点和所有钥匙
for i in range(m):
for j in range(n):
if grid[i][j] == '@':
start = (i, j)
elif grid[i][j] in 'abcdef':
all_keys |= 1 << (ord(grid[i][j]) - ord('a'))
# BFS:状态 = (x, y, 拥有的钥匙集合)
queue = collections.deque([(start[0], start[1], 0)])
visited = {(start[0], start[1], 0)}
steps = 0
while queue:
for _ in range(len(queue)):
x, y, keys = queue.popleft()
if keys == all_keys:
return steps
for dx, dy in [(0,1), (1,0), (0,-1), (-1,0)]:
nx, ny = x + dx, y + dy
if 0 <= nx < m and 0 <= ny < n:
cell = grid[nx][ny]
if cell == '#': # 墙
continue
if cell in 'ABCDEF': # 门
if not (keys & (1 << (ord(cell) - ord('A')))):
continue # 没有对应钥匙
new_keys = keys
if cell in 'abcdef': # 钥匙
new_keys |= 1 << (ord(cell) - ord('a'))
if (nx, ny, new_keys) not in visited:
visited.add((nx, ny, new_keys))
queue.append((nx, ny, new_keys))
steps += 1
return -1
这个例子展示了如何用位运算压缩状态空间,这是处理集合状态的常用技巧。
8.2 DFS:连通性与拓扑排序
DFS的递归本质
DFS的本质是"一条路走到黑"的探索策略。从函数式编程的角度看,DFS是一个天然的递归过程:处理当前节点,然后递归处理所有未访问的邻居。这种递归结构使得DFS特别适合处理树形结构和需要回溯的问题。
从代数角度,DFS构建了一棵深度优先搜索树(DFS Tree),定义了节点之间的父子关系。这棵树的边可以分类为:
- 树边(Tree Edge):DFS树中的边
- 后向边(Back Edge):指向祖先的边,表示存在环
- 前向边(Forward Edge):指向后代的边
- 横跨边(Cross Edge):连接不同子树的边
标准DFS模板
# 递归版本 - 更自然、更易理解
def dfs_recursive(node, visited, graph):
visited.add(node)
# 处理当前节点
process(node)
# 递归访问所有未访问的邻居
for neighbor in graph[node]:
if neighbor not in visited:
dfs_recursive(neighbor, visited, graph)
# 迭代版本 - 使用栈模拟递归
def dfs_iterative(start, graph):
stack = [start]
visited = set()
while stack:
node = stack.pop()
if node in visited:
continue
visited.add(node)
process(node)
# 注意:这里的顺序可能与递归版本不同
for neighbor in graph[node]:
if neighbor not in visited:
stack.append(neighbor)
递归版本更符合DFS的本质,而迭代版本在处理深度很大的图时可以避免栈溢出。
连通性问题:岛屿数量 (#200)
给定一个二维网格,'1'表示陆地,'0'表示水,计算岛屿的数量。这是DFS最经典的应用之一。
def numIslands(grid):
if not grid:
return 0
m, n = len(grid), len(grid[0])
count = 0
def dfs(i, j):
# 边界检查和访问标记
if i < 0 or i >= m or j < 0 or j >= n or grid[i][j] != '1':
return
# 标记为已访问(沉没岛屿)
grid[i][j] = '0'
# 递归探索四个方向
dfs(i + 1, j)
dfs(i - 1, j)
dfs(i, j + 1)
dfs(i, j - 1)
# 遍历网格,发现新岛屿就DFS探索
for i in range(m):
for j in range(n):
if grid[i][j] == '1':
count += 1
dfs(i, j)
return count
这个解法的优雅之处在于直接修改原数组作为访问标记,避免了额外的空间开销。从函数式编程的角度,每次DFS调用都是一个"消耗"岛屿的过程。
拓扑排序:课程表 (#207)
给定课程总数和先修课程关系,判断是否可能完成所有课程。这本质上是检测有向图中是否存在环。
DFS解法利用了三色标记法:
- 白色(0):未访问
- 灰色(1):正在访问(在当前DFS路径上)
- 黑色(2):已完成访问
def canFinish(numCourses, prerequisites):
# 构建邻接表
graph = collections.defaultdict(list)
for course, prereq in prerequisites:
graph[prereq].append(course)
# 0: 未访问, 1: 访问中, 2: 已访问
state = [0] * numCourses
def has_cycle(course):
if state[course] == 1: # 灰色节点,发现环
return True
if state[course] == 2: # 黑色节点,已处理
return False
state[course] = 1 # 标记为灰色
# 递归检查所有后继课程
for next_course in graph[course]:
if has_cycle(next_course):
return True
state[course] = 2 # 标记为黑色
return False
# 检查每个未访问的课程
for course in range(numCourses):
if state[course] == 0:
if has_cycle(course):
return False
return True
这个算法的时间复杂度是 $O(V + E)$,其中 $V$ 是节点数,$E$ 是边数。关键洞察是:如果在DFS过程中遇到了灰色节点,说明形成了环。
拓扑排序的完整实现
如果需要输出拓扑排序的结果,可以在DFS完成时记录节点:
def findOrder(numCourses, prerequisites):
graph = collections.defaultdict(list)
for course, prereq in prerequisites:
graph[prereq].append(course)
state = [0] * numCourses
result = []
def dfs(course):
if state[course] == 1: # 检测到环
return False
if state[course] == 2: # 已访问
return True
state[course] = 1
for next_course in graph[course]:
if not dfs(next_course):
return False
state[course] = 2
result.append(course) # 后序位置添加
return True
for course in range(numCourses):
if state[course] == 0:
if not dfs(course):
return []
return result[::-1] # 反转得到拓扑序
注意结果需要反转,因为DFS的后序遍历给出的是逆拓扑序。
DFS在树中的应用模式
DFS在树中有三种主要的处理位置:
def dfs_tree(node):
if not node:
return
# 前序位置:处理节点本身
pre_process(node)
dfs_tree(node.left)
# 中序位置:处理左子树和右子树之间
in_process(node)
dfs_tree(node.right)
# 后序位置:处理完所有子树后
post_process(node)
不同的问题需要在不同的位置处理:
- 前序:自顶向下传递信息
- 后序:自底向上收集信息
- 中序:主要用于BST的有序遍历
例题:二叉树的最大路径和 (#124)
这是一个需要在后序位置处理的典型例子:
def maxPathSum(root):
max_sum = float('-inf')
def max_gain(node):
nonlocal max_sum
if not node:
return 0
# 递归计算左右子树的最大贡献
left_gain = max(max_gain(node.left), 0)
right_gain = max(max_gain(node.right), 0)
# 当前节点作为路径最高点的路径和
current_max = node.val + left_gain + right_gain
max_sum = max(max_sum, current_max)
# 返回当前节点的最大贡献(只能选一边)
return node.val + max(left_gain, right_gain)
max_gain(root)
return max_sum
这个解法的关键是理解两个概念:
- 最大路径和:可以不经过根节点
- 最大贡献值:必须经过当前节点向上延伸的最大值
8.3 双向BFS优化
双向搜索的数学原理
传统BFS从起点开始,搜索空间以指数级增长。如果目标在第 $d$ 层,需要探索 $O(b^d)$ 个节点($b$ 是分支因子)。双向BFS同时从起点和终点搜索,当两个搜索相遇时停止。
数学上的优化效果:
- 单向BFS:搜索 $b^d$ 个节点
- 双向BFS:搜索 $2 \cdot b^{d/2}$ 个节点
- 优化比例:$\frac{b^d}{2 \cdot b^{d/2}} = \frac{b^{d/2}}{2}$
当 $d$ 较大时,这是一个巨大的改进。例如,如果 $b=10, d=6$,单向需要搜索100万个节点,双向只需要2000个。
双向BFS的实现策略
核心思想是维护两个搜索前沿(frontier),每次扩展较小的前沿以保持平衡:
def bidirectional_bfs(start, target, get_neighbors):
if start == target:
return 0
# 两个搜索前沿
front_frontier = {start}
back_frontier = {target}
# 访问记录
front_visited = {start}
back_visited = {target}
distance = 0
while front_frontier and back_frontier:
distance += 1
# 优化:总是扩展较小的前沿
if len(front_frontier) > len(back_frontier):
front_frontier, back_frontier = back_frontier, front_frontier
front_visited, back_visited = back_visited, front_visited
# 扩展当前前沿
next_frontier = set()
for node in front_frontier:
for neighbor in get_neighbors(node):
# 相遇检查
if neighbor in back_frontier:
return distance
if neighbor not in front_visited:
front_visited.add(neighbor)
next_frontier.add(neighbor)
front_frontier = next_frontier
return -1 # 无法到达
例题1:打开转盘锁 (#752)
有一个4位密码锁,每次可以转动一位数字(0-9循环),给定死锁列表,求从"0000"到目标的最少步数。
def openLock(deadends, target):
if "0000" in deadends:
return -1
if target == "0000":
return 0
dead = set(deadends)
def get_neighbors(state):
neighbors = []
for i in range(4):
digit = int(state[i])
# 向上转
up = state[:i] + str((digit + 1) % 10) + state[i+1:]
if up not in dead:
neighbors.append(up)
# 向下转
down = state[:i] + str((digit - 1) % 10) + state[i+1:]
if down not in dead:
neighbors.append(down)
return neighbors
# 双向BFS
begin_set = {"0000"}
end_set = {target}
visited = {"0000", target}
steps = 0
while begin_set and end_set:
# 总是从较小的集合开始扩展
if len(begin_set) > len(end_set):
begin_set, end_set = end_set, begin_set
temp = set()
for state in begin_set:
for next_state in get_neighbors(state):
if next_state in end_set:
return steps + 1
if next_state not in visited:
visited.add(next_state)
temp.add(next_state)
begin_set = temp
steps += 1
return -1
这个实现的关键优化:
- 使用集合而不是队列,便于相遇检查
- 动态选择较小的前沿进行扩展
- 预先过滤死锁状态,避免无效探索
例题2:单词接龙II (#126)
找出所有从beginWord到endWord的最短转换序列。这题需要记录路径,增加了复杂度。
def findLadders(beginWord, endWord, wordList):
if endWord not in wordList:
return []
wordList = set(wordList)
wordList.add(beginWord)
# 构建邻接表
neighbors = collections.defaultdict(list)
for word in wordList:
for i in range(len(word)):
pattern = word[:i] + '*' + word[i+1:]
neighbors[pattern].append(word)
def get_neighbors(word):
result = []
for i in range(len(word)):
pattern = word[:i] + '*' + word[i+1:]
for neighbor in neighbors[pattern]:
if neighbor != word:
result.append(neighbor)
return result
# 双向BFS找最短路径长度
def bidirectional_bfs():
begin_set = {beginWord}
end_set = {endWord}
visited = set()
# 记录每个节点的前驱(用于构建路径)
parents = collections.defaultdict(list)
found = False
backward = False
while begin_set and end_set and not found:
# 标记已访问
visited.update(begin_set)
# 选择较小的集合
if len(begin_set) > len(end_set):
begin_set, end_set = end_set, begin_set
backward = not backward
temp = set()
for word in begin_set:
for neighbor in get_neighbors(word):
if neighbor in end_set:
found = True
if neighbor not in visited:
temp.add(neighbor)
# 记录父子关系
if backward:
parents[neighbor].append(word)
else:
parents[word].append(neighbor)
begin_set = temp
return found, parents
found, parents = bidirectional_bfs()
if not found:
return []
# DFS构建所有路径
def build_paths(word, path, results):
if word == endWord:
results.append(path[:])
return
for next_word in parents[word]:
path.append(next_word)
build_paths(next_word, path, results)
path.pop()
results = []
build_paths(beginWord, [beginWord], results)
return results
双向BFS的适用条件
双向BFS并非总是最优选择,它适用于:
- 已知起点和终点:必须明确知道目标位置
- 可逆的状态转换:能够从终点反向搜索
- 搜索空间大但路径短:优化效果与路径长度成指数关系
- 状态空间对称:正向和反向的分支因子相近
不适用的情况:
- 目标是满足某个条件而非特定状态
- 反向搜索困难或不可能
- 路径很短(开销可能超过收益)
8.4 记忆化搜索:DFS+缓存
记忆化的本质
记忆化搜索是DFS与动态规划的完美结合。从函数式编程的角度看,它是将纯函数(相同输入总是产生相同输出)的结果缓存起来,避免重复计算。这特别适合具有重叠子问题的递归搜索。
数学上,记忆化搜索是在计算一个函数 $f: S \to V$,其中 $S$ 是状态空间,$V$ 是值域。通过缓存已计算的 $(s, f(s))$ 对,将时间复杂度从指数级降到多项式级。
记忆化搜索的模板
def memoized_search(initial_state):
cache = {}
def dfs(state):
# 检查缓存
if state in cache:
return cache[state]
# 基础情况
if is_base_case(state):
return base_value(state)
# 递归计算
result = float('inf') # 或 float('-inf'),取决于问题
for next_state in get_next_states(state):
result = min(result, dfs(next_state) + cost) # 或 max
# 缓存结果
cache[state] = result
return result
return dfs(initial_state)
Python的 @functools.lru_cache 装饰器提供了优雅的实现:
@functools.lru_cache(maxsize=None)
def dfs(state):
# 递归逻辑
pass
例题1:矩阵中的最长递增路径 (#329)
给定矩阵,找出最长递增路径的长度。这是记忆化搜索的经典应用。
def longestIncreasingPath(matrix):
if not matrix:
return 0
m, n = len(matrix), len(matrix[0])
@functools.lru_cache(maxsize=None)
def dfs(i, j):
# 从位置(i,j)出发的最长递增路径
max_length = 1
for di, dj in [(0,1), (1,0), (0,-1), (-1,0)]:
ni, nj = i + di, j + dj
if 0 <= ni < m and 0 <= nj < n and matrix[ni][nj] > matrix[i][j]:
max_length = max(max_length, 1 + dfs(ni, nj))
return max_length
# 尝试从每个位置出发
return max(dfs(i, j) for i in range(m) for j in range(n))
这个解法的优雅之处:
- 不需要显式的访问标记(递增条件保证无环)
- 每个状态只计算一次
- 时间复杂度从 $O(2^{mn})$ 降到 $O(mn)$
例题2:单词拆分 (#139)
判断字符串是否可以拆分成字典中的单词。这可以看作在字符串上的路径搜索。
def wordBreak(s, wordDict):
word_set = set(wordDict)
@functools.lru_cache(maxsize=None)
def can_break(start):
# 从位置start开始的子串能否拆分
if start == len(s):
return True
for end in range(start + 1, len(s) + 1):
word = s[start:end]
if word in word_set and can_break(end):
return True
return False
return can_break(0)
缓存键的设计
缓存键必须唯一标识状态,同时要考虑效率:
# 1. 元组作为键(适用于固定维度)
@functools.lru_cache(maxsize=None)
def dfs(i, j, k):
pass
# 2. 字符串拼接(简单但可能低效)
cache = {}
key = f"{i}_{j}_{state}"
# 3. 位运算压缩(适用于布尔状态集合)
def dfs(row, col, visited_mask):
key = (row, col, visited_mask)
if key in cache:
return cache[key]
# 4. 冻结集合(适用于无序集合)
def dfs(current, visited_set):
key = (current, frozenset(visited_set))
记忆化搜索 vs 传统DP
两者本质上解决相同的问题,但有不同的特点:
| 特征 | 记忆化搜索 | 传统DP |
| 特征 | 记忆化搜索 | 传统DP |
|---|---|---|
| 思维方式 | 自顶向下 | 自底向上 |
| 实现难度 | 更直观 | 需要确定计算顺序 |
| 空间效率 | 只计算需要的状态 | 计算所有状态 |
| 代码风格 | 递归,函数式 | 迭代,命令式 |
| 调试 | 更容易理解调用栈 | 需要理解状态转移 |
高级例题:戳气球 (#312)
这题展示了记忆化搜索在区间DP中的应用:
def maxCoins(nums):
# 添加虚拟边界
nums = [1] + nums + [1]
n = len(nums)
@functools.lru_cache(maxsize=None)
def dp(left, right):
# 戳破(left, right)开区间内所有气球的最大收益
if left + 1 == right:
return 0
max_coins = 0
# 枚举最后戳破的气球
for i in range(left + 1, right):
coins = nums[left] * nums[i] * nums[right]
coins += dp(left, i) + dp(i, right)
max_coins = max(max_coins, coins)
return max_coins
return dp(0, n - 1)
关键洞察:
- 定义开区间避免边界问题
- 枚举最后戳破的气球,而不是第一个
- 子问题独立性通过"最后操作"保证
记忆化搜索的适用场景
记忆化搜索特别适合:
- 状态空间稀疏:不是所有状态都会被访问
- 递归结构自然:问题本身有递归定义
- 难以确定计算顺序:如树形DP、区间DP
- 需要剪枝优化:可以在递归中提前返回
不适合的场景:
- 状态空间密集且规则
- 空间限制严格
- 需要滚动数组优化
本章小结
BFS vs DFS的选择原则
| 使用BFS | 使用DFS |
| 使用BFS | 使用DFS |
|---|---|
| 最短路径问题 | 所有路径问题 |
| 层序遍历 | 连通性检测 |
| 状态空间小 | 需要回溯 |
| 需要最优解 | 需要所有解 |
核心思维模式
- BFS = 队列 + 层级:水波纹扩散,保证最短
- DFS = 递归 + 回溯:深度探索,穷尽可能
- 双向BFS = 相遇思想:指数级优化
- 记忆化 = DFS + 缓存:避免重复计算
通用模板总结
# BFS模板
def bfs(start):
queue = collections.deque([start])
visited = {start}
while queue:
node = queue.popleft()
for neighbor in get_neighbors(node):
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor)
# DFS模板
def dfs(node, visited):
visited.add(node)
for neighbor in get_neighbors(node):
if neighbor not in visited:
dfs(neighbor, visited)
# 记忆化模板
@functools.lru_cache(maxsize=None)
def memo_dfs(state):
if is_base(state):
return base_value
return optimize(memo_dfs(next_state) for next_state in get_next(state))
常见陷阱与错误
1. 访问标记的时机错误
错误示例:
# 错误:出队时才标记
while queue:
node = queue.popleft()
if node in visited:
continue
visited.add(node) # 太晚了!
正确做法:入队时就标记,避免重复入队。
2. DFS栈溢出
问题:Python默认递归深度限制约1000。
解决方案:
import sys
sys.setrecursionlimit(10000) # 提高限制
# 或使用迭代版本
def dfs_iterative(start):
stack = [start]
while stack:
node = stack.pop()
# ...
3. 状态表示不当
错误:使用可变对象作为缓存键
cache[(node, [1,2,3])] # 错误:列表不可哈希
正确:使用不可变类型
cache[(node, tuple([1,2,3]))] # 正确
cache[(node, frozenset({1,2,3}))] # 对于集合
4. 双向BFS的同步问题
陷阱:两个方向使用同一个visited集合可能导致错过相遇点。
解决:分别维护visited集合,相遇检查用frontier集合。
5. 记忆化的副作用
问题:函数有副作用时不能直接记忆化。
# 错误:修改了外部状态
@functools.lru_cache
def dfs(node):
result.append(node) # 副作用!
return something
解决:保持函数纯粹,返回所需信息。
6. 边界条件遗漏
常见遗漏:
- 空图或空树
- 只有一个节点
- 起点即终点
- 无解的情况
最佳实践:先处理边界情况,再进入主逻辑。