数据结构——堆排序

本文介绍堆排序的思想和代码


相关基本概念

  • 完全二叉树 :除最后一层外,每层节点都填满;最后一层节点从左到右排列
    • 完全二叉树的核心性质 :任意节点索引为 i 的子节点为:
      • 左子节点索引 2*i+1
      • 右子节点索引 2*i+2
  • 大顶堆(最大堆) :每个父节点的值 ≥ 其子节点的值(堆排序常用)
    • 注:小顶堆则与大顶堆相反,父节点的值小于其他子节点
  • 堆排序是一种基于完全二叉树结构(堆)的高效排序算法
    • 时间复杂度稳定为 \(O(n \log n)\),且是原地排序(空间复杂度 \(O(1)\))

基本思想

  • 堆排序实现步骤:
    • 第一步:初始化生成最大堆(heapify)
      • 功能:将无序数组转化为大顶堆(根节点是整个数组的最大值)
      • 做法:从倒数第一个非叶节点开始,按照下虑操作找到该节点在最大堆中的位置,逐步遍历每一个非叶结点执行上述操作,生成最大堆
    • 第二步:交换最大值到末尾
      • 功能:将堆顶的最大值放到数组末尾(确定一个元素的最终位置)
      • 做法:交换根节点与最后一个元素的值,树节点减少1(刚才根节点是当前树的最大值,更换以后就是有序的元素了,不再参与最大堆调整)
    • 第三步:重新生成最大堆(调整根节点)
      • 功能:对剩余未排序的元素重新调整为大顶堆
      • 做法:调整新换上来的根节点(下虑操作),找到其应该在的地方,重新生成最大堆
    • 循环执行:循环 第二步 和 第三步,直到最大堆只剩下一个元素

总结

  1. 堆排序的核心是构建大顶堆 + 逐步提取堆顶最大值,利用堆的特性实现高效排序
  2. 堆化(heapify)是关键操作,作用是修复不符合大顶堆规则的子树,时间复杂度为 $O(\log n)$
  3. 堆排序的优势是时间复杂度稳定(不受数据分布影响),缺点是不稳定(相同值的元素可能改变相对位置)

堆排序关键子步骤:堆化(Heapify)

  • 堆化是堆排序的核心操作,作用是修复不符合大顶堆规则的子树
    • 输入:数组、堆的大小、需要调整的父节点索引 i
    • 逻辑:
      • 1)先假设当前父节点 i 是最大值节点
      • 2)核心 :找到 i 的左子节点(索引 2*i+1)和右子节点(索引 2*i+2
      • 3)比较父节点与左右子节点,找到最大值的索引 largest
      • 4)如果 largest != i(父节点不是最大值),交换两者的值,使得父节点变成最大值;然后并递归调整被交换的子节点(因为交换后子树可能不符合堆规则)

堆排序 Python 代码实现(带详细注释)

  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    def heap_sort(arr):
    """
    堆排序(升序)实现
    :param arr: 待排序的数组(列表)
    """
    n = len(arr)

    # 第一步:构建大顶堆(从最后一个非叶子节点开始向前遍历)
    # 最后一个非叶子节点的索引:n//2 - 1(完全二叉树特性)
    for i in range(n // 2 - 1, -1, -1):
    heapify(arr, n, i)

    # 第二步:逐个取出堆顶元素(最大值),放到数组末尾
    for i in range(n - 1, 0, -1):
    # 交换堆顶(arr[0])和当前未排序部分的末尾(arr[i])
    arr[0], arr[i] = arr[i], arr[0]
    # 对剩余未排序的元素(0到i-1)重新堆化(堆大小变为i)
    heapify(arr, i, 0)

    def heapify(arr, heap_size, parent_idx):
    """
    堆化函数:调整以 parent_idx 为根的子树为大顶堆
    :param arr: 待调整的数组
    :param heap_size: 当前堆的有效大小(未排序部分的长度)
    :param parent_idx: 需要调整的父节点索引
    """
    largest = parent_idx # 初始化最大值为父节点
    left_child = 2 * parent_idx + 1 # 左子节点索引
    right_child = 2 * parent_idx + 2 # 右子节点索引

    # 1. 比较父节点与左子节点,更新最大值索引
    if left_child < heap_size and arr[left_child] > arr[largest]:
    largest = left_child

    # 2. 比较当前最大值与右子节点,更新最大值索引
    if right_child < heap_size and arr[right_child] > arr[largest]:
    largest = right_child

    # 3. 如果最大值不是父节点,交换并递归调整子树
    if largest != parent_idx:
    arr[parent_idx], arr[largest] = arr[largest], arr[parent_idx]
    # 递归调整被交换的子节点(因为交换后子树可能不符合堆规则)
    heapify(arr, heap_size, largest) # largest 是刚换下来的父节点,此时作为父节点继续判断是否满足最大堆

    # 测试示例
    if __name__ == "__main__":
    # 待排序数组
    test_arr = [12, 11, 13, 5, 6, 7]
    print("排序前数组:", test_arr)

    heap_sort(test_arr)

    print("排序后数组:", test_arr)
  • 代码中关键逻辑解释:

    • 1)构建大顶堆
      • 从最后一个非叶子节点(n//2 -1)开始向前遍历,因为叶子节点本身就是堆(单个节点满足堆的性质),无需调整
      • 每个节点通过 heapify 函数调整为大顶堆,最终整个数组变成大顶堆(根节点是最大值)
    • 2)交换堆顶与末尾元素
      • 堆顶(arr[0])是当前未排序部分的最大值,交换到数组末尾后,该元素的位置就固定了
      • 每次交换后,堆的有效大小减 1(因为末尾元素已排序),重新对剩余元素堆化
    • 3)堆化函数的递归调整
      • 交换父节点和子节点后,子节点所在的子树可能不再是大顶堆,因此需要递归调整该子树

附录:数据流中的中位数实现

  • 题目实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    # -*- coding:utf-8 -*-
    class Solution:
    def __init__(self):
    self.left_max = []
    self.right_min = []

    def Insert(self, num):
    # write code here
    if not self.left_max or num < self.left_max[0]:
    self.left_max.append(num)
    Solution.heapify_up_max(self.left_max, len(self.left_max)-1)
    if len(self.left_max) > len(self.right_min) + 1:
    mid = Solution.pop_max(self.left_max)
    self.right_min.append(mid)
    Solution.heapify_up_min(self.right_min, len(self.right_min)-1)
    else:
    self.right_min.append(num)
    Solution.heapify_up_min(self.right_min, len(self.right_min)-1)
    if len(self.right_min) > len(self.left_max)+1:
    mid = Solution.pop_min(self.right_min)
    self.left_max.append(mid)
    Solution.heapify_up_max(self.left_max, len(self.left_max)-1)
    pass

    def GetMedian(self):
    # write code here
    if len(self.right_min) > len(self.left_max):
    return self.right_min[0]
    elif len(self.right_min) < len(self.left_max):
    return self.left_max[0]
    else:
    return (self.right_min[0] + self.left_max[0])/2
    pass

    @staticmethod
    def heapify_up_max(a, i):
    curr = i
    while (curr-1)//2 >= 0:
    if a[curr] > a[(curr-1)//2]:
    a[(curr-1)//2], a[curr] = a[curr], a[(curr-1)//2]
    else:
    break
    curr = (curr-1)//2

    @staticmethod
    def heapify_up_min(a, i):
    curr = i
    while (curr-1)//2 >= 0:
    if a[curr] < a[(curr-1)//2]:
    a[(curr-1)//2], a[curr] = a[curr], a[(curr-1)//2]
    else:
    break
    curr = (curr-1)//2

    @staticmethod
    def heapify_down_max(a, i):
    maxi = i
    if i*2+1 < len(a) and a[i*2+1] > a[maxi]:
    maxi = i*2+1
    if i*2+2 < len(a) and a[i*2+2] > a[maxi]:
    maxi = i*2+2
    if maxi != i:
    a[maxi], a[i] = a[i], a[maxi]
    Solution.heapify_down_max(a, maxi)

    @staticmethod
    def heapify_down_min(a, i):
    mini = i
    if i*2+1 < len(a) and a[i*2+1] < a[mini]:
    mini = i*2+1
    if i*2+2 < len(a) and a[i*2+2] < a[mini]:
    mini = i*2+2
    if mini != i:
    a[mini], a[i] = a[i], a[mini]
    Solution.heapify_down_min(a, mini)

    @staticmethod
    def pop_min(a):
    result = a[0]
    a[0] = a.pop()
    Solution.heapify_down_min(a, 0)
    return result

    @staticmethod
    def pop_max(a):
    result = a[0]
    a[0] = a.pop()
    Solution.heapify_down_max(a, 0)
    return result