Python——并发模型

本文介绍Python中的并发机制,并给出一种最简洁的Python并发库


使用协程(yield语句)

  • 实现随时暂停和开始
  • 完全串行的操作,无法实现时间上的并行,这里指的是不能同时进行某个操作
  • 与Go语言的协程不同,Python的协程更像是一个“生成器”

使用线程

  • 参考threading模块实现自己的线程

使用concurrent.futures

实现线程池模型

  • 实现一般的线程池模型,代码如下,关键代码仅仅两行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import time
    from concurrent import futures

    def sleep_one_second(key):
    time.sleep(1)
    return "[%s]Done" % key

    ml = "ABCDEFGHIJ"
    with futures.ThreadPoolExecutor(10) as executor:
    res = executor.map(sleep_one_second, ml)

    print([r for r in res])
  • 上面的代码可以在一秒内执行完成,因为共有10个线程并发

  • 在实现爬虫程序时,如果需要爬取的某些数据是相对独立的,那么我们完全可以用线程池实现,而不用使用复杂的线程模块*

实现进程池模型

  • 仅仅需要修改futures.ThreadPoolExecutor为futures.ProcessPoolExecutor即可
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import time
    from concurrent import futures

    def sleep_one_second(key):
    time.sleep(1)
    return "[%s]Done" % key

    ml = "ABCDEFGHIJ"
    with futures.ProcessPoolExecutor(10) as executor:
    res = executor.map(sleep_one_second, ml)

    print([r for r in res])

进程与线程内存区别

对全局变量的访问对比
  • 线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from concurrent import futures
    global_list = []

    def test_futures(range_num):
    global_list.append(range_num)
    print global_list
    return range_num

    with futures.ThreadPoolExecutor(8) as executor:
    res = executor.map(test_futures, range(10))

    print "the final global_list: %s" % global_list
    • 上面的代码输出如下:

      [0]
      [0, [10, 2]
      , 1[0, 1, 2, , 32]
      , 3, [04, 1], 2, 3, 4
      [0, , 5]
      1, [0, 21, 2, [3, , 3, 044, , 51, , 6, , 75]
      2, [0, 1, 63, 7, , 2, 3[40, , 8, , 4, 9, 155, , 6, 2]6, , 7
      , 8, 7, 9, ]3
      8, 4, , 59, 6, ]
      7, 8, 9]
      the final global_list: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
      the results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

  • 进程:

from concurrent import futures
global_list = []

def test_futures(range_num):
    global_list.append(range_num)
    print global_list
    return range_num

with futures.ProcessPoolExecutor(8) as executor:
    res = executor.map(test_futures, range(10))

print "the final global_list: %s" % global_list
    • 上面的代码输出如下:

      [0]
      [1]
      [2]
      [3]
      [0, 4]
      [5]
      [6]
      [7]
      [1, 8]
      [2, 9]
      the final global_list: []
      the results: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

  • 原因分析

    • 线程之间共享地址空间,所以所有的线程线程访问同一个全局共享变量
    • 进程之间不共享地址空间,所以不同进程访问不同共享变量
    • 在程序中Process之间不共享地址空间,但是futures.ProcessPoolExecutor(max_workers)任务分配时受限与参数max_workers的影响,所以可以预估本地机器最多开启max_workers个进程,同一进程中地址空间共享,所以会有部分任务被分配给同一进程的不同线程,从而出现部分共享变量被不同任务访问到
    • 总结:
      • futures.ThreadPoolExecutor单进程多线程中全局变量共享
      • futures.ProcessPoolExecutor多进程多线程中每个进程内部的线程全局变量共享
      • 不同进程之间即使时全局变量也不能共享

Python中进程 VS 线程

  • Python中由于全局解释器锁(GIL)的存在,同一进程中的所有线程使用同一个解释器对象,所以它们无法真正实现并行
  • 所以在想要充分利用多核的时候,需要选择使用多进程
  • 更多信息参考Process和Thread分析