引言 (Introduction)
在前六篇中,我们遍历了从基础二叉树到自平衡 BST、再到线段树的整个谱系。这些树结构都有一个共同特点:节点间有严格的大小比较关系(BST)或区间层次关系(线段树)。而本篇要探讨的堆(Heap)则另辟蹊径——它只关心极值(最大值或最小值)。
堆被广泛用于以下场景:
- 优先队列(操作系统进程调度、Dijkstra 最短路径)
- Top-K 问题(找出最大的 K 个元素)
- 堆排序(原地 $O(N \log N)$ 排序)
- 数据流中位数(实时维护中位数)
本文作为系列终篇,不仅将完整剖析堆的原理与实现,还会在结尾回顾全部七篇文章的知识脉络。
1. 堆的定义与数组表示
1.1 结构与性质
堆是一棵满足堆性质的完全二叉树:
- 最大堆(Max-Heap):任意节点的值 $\ge$ 其所有子节点的值(根节点是最大值)。
- 最小堆(Min-Heap):任意节点的值 $\le$ 其所有子节点的值(根节点是最小值)。
由于是完全二叉树,堆天然适合用数组存储(与上一篇线段树同样的数组映射):
索引 i 的左孩子:left = 2 * i + 1
索引 i 的右孩子:right = 2 * i + 2
索引 i 的父节点:parent = (i - 1) // 2
数组: [100, 36, 19, 25, 17, 3, 7, 1, 2]
索引: 0 1 2 3 4 5 6 7 8
树形视图:
100(0)
/ \
36(1) 19(2)
/ \ / \
25(3) 17(4) 3(5) 7(6)
/ \
1(7) 2(8)
1.2 数据结构定义
from typing import List, Optional
class MaxHeap:
"""最大堆实现"""
def __init__(self, data: Optional[List[int]] = None):
self._heap: List[int] = data[:] if data else []
@staticmethod
def _parent(idx: int) -> int:
return (idx - 1) // 2
@staticmethod
def _left(idx: int) -> int:
return 2 * idx + 1
@staticmethod
def _right(idx: int) -> int:
return 2 * idx + 2
def __len__(self) -> int:
return len(self._heap)
def peek(self) -> Optional[int]:
"""返回堆顶元素(最大值),不删除"""
return self._heap[0] if self._heap else None
2. 核心操作
2.1 上浮 (Sift Up) 与插入
插入新元素时,将其追加到数组末尾,然后不断与父节点比较并交换,直到堆性质恢复:
def push(self, val: int) -> None:
"""插入元素并上浮至正确位置"""
self._heap.append(val)
self._sift_up(len(self._heap) - 1)
def _sift_up(self, idx: int) -> None:
"""自底向上调整,恢复最大堆性质"""
while idx > 0:
p = self._parent(idx)
if self._heap[idx] <= self._heap[p]:
break # 已满足堆性质
self._heap[idx], self._heap[p] = self._heap[p], self._heap[idx]
idx = p
2.2 下沉 (Sift Down) 与删除堆顶
删除堆顶时,将最后一个元素移到堆顶,然后不断与较大的子节点交换,直到堆性质恢复:
def pop(self) -> Optional[int]:
"""删除并返回堆顶元素(最大值)"""
if not self._heap:
return None
if len(self._heap) == 1:
return self._heap.pop()
root = self._heap[0]
self._heap[0] = self._heap.pop() # 末尾元素移到堆顶
self._sift_down(0, len(self._heap))
return root
def _sift_down(self, idx: int, size: int) -> None:
"""自顶向下调整,恢复最大堆性质"""
while True:
largest = idx
left = self._left(idx)
right = self._right(idx)
if left < size and self._heap[left] > self._heap[largest]:
largest = left
if right < size and self._heap[right] > self._heap[largest]:
largest = right
if largest == idx:
break # 已满足堆性质
self._heap[idx], self._heap[largest] = self._heap[largest], self._heap[idx]
idx = largest
复杂度分析:push 与 pop 均需沿树高至多移动 $\log N$ 次,每次比较/交换 $O(1)$,因此时间复杂度为 $O(\log N)$,空间复杂度 $O(1)$。
3. 建堆 (Heapify) 与堆排序
3.1 $O(N)$ 建堆
从一个无序数组构建堆,朴素做法是逐一 push,复杂度 $O(N \log N)$。更优的做法是自底向上对所有非叶子节点做 _sift_down:
@staticmethod
def heapify(arr: List[int]) -> 'MaxHeap':
"""O(N) 时间将无序数组转化为最大堆"""
heap = MaxHeap(arr)
# 从最后一个非叶子节点开始,向前遍历
last_parent = MaxHeap._parent(len(arr) - 1)
for i in range(last_parent, -1, -1):
heap._sift_down(i, len(arr))
return heap
3.2 为什么是 $O(N)$?
直觉上自顶向下建堆似乎是 $O(N \log N)$,但实际上自底向上建堆的复杂度为 $O(N)$。证明如下:
设树高为 $h \approx \log_2 N$。在高度为 $k$ 的层上,约有 $\lceil N / 2^{k+1} \rceil$ 个节点,每个节点的 _sift_down 代价至多为 $O(k)$。总代价:
无穷级数 $\sum_{k=0}^{\infty} k / 2^{k+1} = 1$ 收敛,故 $T(N) = O(N)$。越是靠近底层,节点越多,但 _sift_down 的步数越少,大部分节点只需常数时间。
3.3 堆排序
利用最大堆实现原地排序:
@staticmethod
def heap_sort(arr: List[int]) -> List[int]:
"""堆排序:O(N log N),原地排序"""
heap = MaxHeap.heapify(arr)
size = len(heap._heap)
for i in range(size - 1, 0, -1):
# 1. 堆顶(最大值)与末尾元素交换
heap._heap[0], heap._heap[i] = heap._heap[i], heap._heap[0]
# 2. 堆的有效范围缩小,下沉恢复堆性质
heap._sift_down(0, i)
return heap._heap
4. Python heapq 模块实战
Python 标准库 heapq 实现的是最小堆。核心 API:
import heapq
# ---- 最小堆 ----
heap = []
heapq.heappush(heap, 5) # [5]
heapq.heappush(heap, 2) # [2, 5]
heapq.heappush(heap, 8) # [2, 5, 8]
val = heapq.heappop(heap) # 2 (最小值出堆)
# ---- 最大堆技巧:插入负值 ----
max_heap = []
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -2)
max_val = -heapq.heappop(max_heap) # 5
# ---- 堆化已有列表 ----
arr = [5, 2, 8, 1]
heapq.heapify(arr) # O(N) 原地建堆
# ---- Top-K: nlargest / nsmallest ----
top3 = heapq.nlargest(3, arr) # [8, 5, 2]
bot3 = heapq.nsmallest(3, arr) # [1, 2, 5]
多字段优先级:当元素自身不能直接比较大小时,使用元组 (priority, item):
import heapq
class Task:
def __init__(self, name: str, priority: int):
self.name = name
self.priority = priority
def __repr__(self):
return f"Task({self.name}, {self.priority})"
pq: List[tuple[int, Task]] = []
heapq.heappush(pq, (3, Task("C", 3)))
heapq.heappush(pq, (1, Task("A", 1)))
heapq.heappush(pq, (2, Task("B", 2)))
while pq:
_, task = heapq.heappop(pq) # A, B, C (按优先级升序)
5. LeetCode 实战应用
5.1 #215 数组中的第 K 个最大元素 (Kth Largest Element)
import heapq
from typing import List
def find_kth_largest(nums: List[int], k: int) -> int:
"""LeetCode 215: 最小堆维护 Top-K"""
heap = []
for num in nums:
heapq.heappush(heap, num)
if len(heap) > k:
heapq.heappop(heap) # 弹出当前第 K+1 大的元素
return heap[0] # 堆顶即第 K 大
def find_kth_largest_heapify(nums: List[int], k: int) -> int:
"""LeetCode 215: heapify + k 次弹出"""
heapq.heapify(nums)
# 将所有值取反模拟最大堆,弹出 k 次
# 或直接弹出 len-k 个最小值
for _ in range(len(nums) - k):
heapq.heappop(nums)
return nums[0]
复杂度对比:
- 最小堆维护 Top-K:时间 $O(N \log K)$,空间 $O(K)$(适合 $K$ 小的情况)
- Heapify + 弹出:时间 $O(N + N \log N)$,空间 $O(1)$
5.2 #295 数据流的中位数 (Find Median from Data Stream)
题目描述:设计数据结构,支持动态添加数值,并随时返回当前所有数值的中位数。
使用双堆技巧(Two-Heap Technique)——这是堆最精彩的应用之一:
import heapq
class MedianFinder:
"""LeetCode 295: 最大堆(左半)+ 最小堆(右半)"""
def __init__(self):
self.left = [] # 最大堆(存较小的一半,用负值模拟)
self.right = [] # 最小堆(存较大的一半)
def addNum(self, num: int) -> None:
# 1. 先推入左半(最大堆),再将左半最大值推入右半(最小堆)
heapq.heappush(self.left, -num)
heapq.heappush(self.right, -heapq.heappop(self.left))
# 2. 保证左右规模均衡:左 >= 右,且差不超过 1
if len(self.left) < len(self.right):
heapq.heappush(self.left, -heapq.heappop(self.right))
def findMedian(self) -> float:
if len(self.left) > len(self.right):
return float(-self.left[0]) # 奇数,取左半最大值
return (-self.left[0] + self.right[0]) / 2.0 # 偶数,取均值
核心思想:
left(最大堆):存储所有数中较小的一半,方便取最大值。right(最小堆):存储较大的一半,方便取最小值。- 中位数 =
left的最大值(奇数)或(left_max + right_min) / 2(偶数)。
5.3 #23 合并 K 个升序链表 (Merge k Sorted Lists)
import heapq
from typing import List, Optional
class ListNode:
def __init__(self, val: int = 0, next: 'Optional[ListNode]' = None):
self.val = val
self.next = next
def merge_k_lists(lists: List[Optional[ListNode]]) -> Optional[ListNode]:
"""LeetCode 23: 最小堆合并 K 个有序链表"""
heap: List[tuple[int, int, ListNode]] = []
# 1. 将每个链表的头节点推入堆(用 id 作为 tie-breaker)
for i, head in enumerate(lists):
if head:
heapq.heappush(heap, (head.val, i, head))
dummy = ListNode()
tail = dummy
# 2. 每次取出最小节点,追加到结果链表,再将其后继推入堆
while heap:
_, i, node = heapq.heappop(heap)
tail.next = node
tail = node
if node.next:
heapq.heappush(heap, (node.next.val, i, node.next))
return dummy.next
复杂度:设总节点数为 $N$,链表数为 $K$。堆中至多维持 $K$ 个元素,每次 heappush/heappop 为 $O(\log K)$,总复杂度 $O(N \log K)$。
结论 (Conclusion)
堆的核心要点
堆放弃了对全局有序的追求,只专注于「极值快速获取」这一单一需求。正是这种精简,使得堆在所有基于比较的 Top-K 求解中达到理论最优——用一个大小为 $K$ 的堆,我们只需 $O(N \log K)$ 时间就可以从 $N$ 个元素中找到前 $K$ 大(或小)的元素,而不需要 $O(N \log N)$ 的全排序。
系列回顾:二叉树的算法宇宙
| 篇章 | 主题 | 核心洞察 |
|---|---|---|
| (一) | 基础遍历与结构属性 | DFS/BFS 是一切树算法的根基;递归栈状态是理解遍历的关键。 |
| (二) | BST 与路径逻辑 | 有序约束将查找优化到 $O(\log N)$;路径问题本质是回溯与边界传递。 |
| (三) | 复杂结构与空间优化 | 全局状态管理区分局部/全局最优;Morris 遍历用闲置指针压榨空间。 |
| (四) | AVL 树与自平衡 | 旋转是平衡的钥匙;四种失衡场景即 LL/RR/LR/RL。 |
| (五) | 红黑树与工程对比 | 放弃严格平衡换取更少旋转;5 性质 + 6 cases 实现近似平衡。 |
| (六) | 线段树与区间查询 | 分治预计算将区间查询降为 $O(\log N)$;延迟传播是批量操作的精髓。 |
| (七) | 堆与优先队列 | 只关注极值,极致简化;双堆技巧是中位数流问题的经典解法。 |
从第一篇的 DFS 递归,到这篇的堆排序,我们横跨了二叉树算法的整个经纬。递归与迭代、平衡与性能、局部与全局——这些主题贯穿始终。二叉树不仅是一种数据结构,更是一种分治与递推的思维范式。愿本系列能成为你数据结构之旅的一块踏脚石。