Python学习历程——线程、进程、队列

在这里插入图片描述

1. 基础概念:并发与并行

1.1. 什么是并发 (Concurrency)?

  • 概念:在单个处理器上交替执行多个任务。
  • 目的:提高程序响应速度和资源利用率(尤其是在等待 I/O 时)。

交替执行多个任务,并不是同时开始。

在这里插入图片描述

1.2. 什么是并行 (Parallelism)?

  • 概念:在多个处理器上同时执行多个任务。
  • 目的:真正地缩短计算密集型任务的总执行时间。

同时开始多个任务,依赖多核CPU。

在这里插入图片描述

1.3. 关键问题类型:CPU 密集型 vs I/O 密集型

  • CPU 密集型 (CPU-bound): 任务主要是计算,如视频编码、科学计算。
  • I/O 密集型 (I/O-bound): 任务主要是在等待外部资源,如文件读写、网络请求。

1.4. Python 中的进程与线程

  • 进程 (Process): 操作系统资源分配的基本单位,拥有独立的内存空间。
  • 线程 (Thread): CPU 调度的基本单位,共享所属进程的内存空间。

用Tomcat来说,一个Tomcat就是一个进程,而线程就是Java中的一个Thread。

在Java中我们通常会使用Spring的线程池来管理多线程问题,一个系统中架构师都会统一管理线程池的配置,最大并发量是多少,线程的摧毁和创建都会统一管理。当然我们也可以自己创建一个线程池,不过使用时要注意系统的配置,一般情况下web模块会在配置文件中配置最大的线程数量,一般都是基于Tomcat最大的线程数量来配置的,这个具体根据服务器的CPU性能来决定。

1.5 Python和Java对比(重点)

核心模式差异

  • Python:Python使用的是操作系统的原生线程,但受全局解释器(GIL)的限制,简单来说就是在同一时刻只能执行一个Python的字节码。这个限制导致了Python多线程无法实现CPU的密集型任务的并行,但非常适合IO密集型任务的并发。
  • Java:Java同样也有GIL,但是它不受限制,JVM完全支持我们执行CPU的密集型任务并发,所以它对IO 和 CPU 密集型任务的并发都非常支持。

理念差异

  • Python:由于GIL的限制,Python明确规定,使用多线程实现IO的密集型任务,使用多进程实现CPU的密集型任务
  • Java:Java 则统一使用多线程模型来应对这两种任务,这也是 Java 在大型、高并发企业级应用中占据主导地位的原因之一

2. 深入多进程:multiprocessing 模块

2.1. 核心概念:为何使用多进程?

  • 优势:利用多核 CPU 实现真正的并行计算,完美应对 CPU 密集型任务。
  • 缺点:资源开销大,进程间通信 (IPC) 相对复杂。

计算机本身就是支持并发的,而并行是通过CPU的多核来执行的,在有些特殊场景下并行是非常有必要的,目前我经常使用且掌握的并行场景有两个,其一是在数据同步时通过并行实现高效的同步,减少程序运行时间,其二是在某些数据统计和拉取时使用并行对多个账号或者多个维度同时操作。

为什么在有些时候必须要使用到并行,一般的web请求都是有最大的相应时长的,一旦你的程序运行时间过长,你的前台很可能会超时,而且长时间占据大部分的服务器资源也是非常不好的。

2.2. 创建和管理进程

  • multiprocessing.Process 类的使用
    • target: 指定执行函数
    • args, kwargs: 传递参数
    • p.start(): 启动进程
    • p.join(): 等待进程结束
    • p.is_alive(): 检查进程状态
    • p.pid: 获取进程 ID
import multiprocessing
import os
import time

def worker(num):
    """子进程执行的函数"""
    print(f"Worker {num}: Process ID {os.getpid()}")
    time.sleep(2)
    print(f"Worker {num} finished.")

if __name__ == '__main__':
    processes = []
    for i in range(3):
        # args=(i,) 表示单元组
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start() # 启动进程

    print("Main process waiting for all child processes to complete.")
    for p in processes:
        p.join() # 等待子进程结束

    print("All processes finished.")

注意这个例子,有几个点容易混淆

  1. p.start 就已经开始进程的执行了。
  2. p.join 的作用只是为了阻塞主程序,等待所有的进程执行结束。
  3. 目前的例子来看是细粒度的进程控制,子进程是并行的,但是结果处理是串行的。

下面的例子是接近Java一般的粗粒度控制

import multiprocessing
import os
import time

def worker(num):
    """子进程执行的函数"""
    print(f"Worker {num}: Process ID {os.getpid()}")
    time.sleep(num + 1)  # 不同执行时间
    return f"Worker {num} completed in {num + 1}s"

if __name__ == '__main__':
    # 创建进程池
    with multiprocessing.Pool(processes=3) as pool:
        print("提交异步任务...")
        
        # 异步提交所有任务
        async_results = [
            pool.apply_async(worker, (i,)) 
            for i in range(3)
        ]
        
        print("主程序可以做其他事情...")
        time.sleep(1)
        print("主程序检查任务进度...")
        
        # 统一等待所有任务完成
        results = []
        for i, async_result in enumerate(async_results):
            try:
                result = async_result.get(timeout=10)  # 设置超时
                results.append(result)
                print(f"任务{i}完成: {result}")
            except multiprocessing.TimeoutError:
                print(f"任务{i}超时")
            except Exception as e:
                print(f"任务{i}失败: {e}")
    
    print("所有进程完成")
    print("最终结果:", results)

上面的两个例子其实都是串行的处理结果,其实要真正做到并行处理结果应该是回调函数的写法(只是其一,还有更高级的队列实现)

from concurrent.futures import ProcessPoolExecutor
import time

def worker(number, duration):
    print(f"Worker {number} 开始")
    time.sleep(duration)
    return f"Worker {number} 的结果(耗时{duration}秒)"

def callback(future):
    """结果就绪时立即回调"""
    result = future.result()
    print(f"回调收到: {result}")
    # 可以立即处理结果

if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 提交任务并绑定回调
        futures = []
        durations = [3, 1, 2]  # 不同执行时间
        
        for i, duration in enumerate(durations):
            future = executor.submit(worker, i, duration)
            future.add_done_callback(callback)  # 绑定回调
            futures.append(future)
        
        print("所有任务已提交,主程序继续...")
        
        # 也可以选择等待所有任务完成(但回调已经处理了结果),这里只是统一的结果处理
        for future in futures:
            future.result()  # 只是为了确保没有异常
    
    print("所有任务完成")

2.3. 进程间通信 (IPC - Inter-Process Communication)

  • multiprocessing.Queue: 进程安全的消息队列,最常用。
  • multiprocessing.Pipe: 管道,用于两个进程间的双向通信。
  • multiprocessing.Manager: 管理共享对象,如 Manager.dict(), Manager.list()
  • 共享内存:multiprocessing.Value, multiprocessing.Array

由于进程内存不共享,需要使用 Queue, Pipe, Manager 等工具来传递数据。其中 multiprocessing.Queue 是最常用的,它是一个进程安全的消息队列。

一般进程之间很少通信,这里我们先了解一下通信的手段,在之后内容学习之后再来解释。

因为Java的线程内存是共享的,一般不存在这种问题,可以直接获取全局变量和修改全局变量。

2.4. 使用进程池 (Pool) 高效管理进程

一般我们都是使用进程池的,方便管理而且稳定,只有在极特殊的场景下我们需要手动管理进程,要不断的释放进程(一个进程结束后),因为进程池一直是占用你初始化的进程数量的,直到with模块结束才释放资源。

  • multiprocessing.Pool: 维护一个固定数量的进程池。
  • pool.apply(): 阻塞式执行。
  • pool.apply_async(): 非阻塞式执行,返回 AsyncResult 对象。
  • pool.map(): 阻塞式,对可迭代对象的每个元素应用函数。
  • pool.map_async(): map 的非阻塞版本。
  • pool.close(): 关闭池,不再接受新任务。
  • pool.join(): 等待所有工作进程退出。
from multiprocessing import Pool
import time

def square(x):
    return x * x

if __name__ == '__main__':
    # 创建一个包含 4 个进程的池
    with Pool(processes=4) as pool:
        # 使用 map 方法,它会将列表中的每个元素分配给池中的一个进程
        results = pool.map(square, range(10))
        print(results)

关于进程池的map方法解析

参数 说明 本例中的值
function 要并行执行的函数 square
iterable 可迭代对象,包含所有输入数据 range(10)
chunksize 每个进程一次处理的数据块大小 默认值

当然你也可以使用内置的map进行,只不过是串行

# 内置map - 串行执行
results = list(map(square, range(10)))
# 顺序执行: square(0) → square(1) → ... → square(9)

# pool.map - 并行执行  
results = pool.map(square, range(10))
# 并行执行: square(0), square(1), square(2)... 同时进行

2.5. 进程同步

  • multiprocessing.Lock: 锁,确保同一时间只有一个进程能访问共享资源。
  • multiprocessing.Event: 事件,一个进程通知其他进程的简单方式。
  • multiprocessing.Semaphore: 信号量,控制同时访问某一资源的进程数量。

Lock锁(卫生间锁门——互斥锁——同一时间只有一个人拉屎)

  • 当多个进程需要访问或修改同一个共享资源时(例如一个文件、一个共享内存变量、一个数据库连接),就必须进行同步,否则会产生“竞态条件”,导致数据损坏或程序逻辑混乱。虽然进程拥有独立的内存空间,但它们仍然可以与外部的、系统级的共享资源进行交互。multiprocessing 模块提供了与 threading 模块几乎完全相同的同步原语,但它们是为进程安全设计的。

核心方法

  • lock.acquire(): 尝试获取锁。如果锁已被其他进程持有,则当前进程会阻塞(等待),直到锁被释放。
  • lock.release(): 释放锁,让等待中的其他进程有机会获取它。

一般来说,我们使用 with lock 即可达到效果,不需要手动控制,它会自动加锁和释放锁。

import multiprocessing
import time
import os

def write_to_file(lock, process_id, content):
    """一个需要独占访问文件的写操作"""
    with lock:
        print(f"进程 {process_id} (PID: {os.getpid()}) 获得了锁,开始写入文件...")
        with open("log.txt", "a") as f:
            f.write(f"来自进程 {process_id} 的日志: {content}\n")
            time.sleep(0.5) # 模拟I/O操作
        print(f"进程 {process_id} 释放了锁。")

if __name__ == "__main__":
    # 在主进程中创建一个锁对象
    lock = multiprocessing.Lock()
    
    # 清理旧的日志文件
    if os.path.exists("log.txt"):
        os.remove("log.txt")

    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=write_to_file, args=(lock, i, f"这是一条消息 {i}"))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
        
    print("\n所有进程执行完毕。查看 log.txt 文件内容。")

Event事件(开始拉屎的信号——来一根)

  • Event 就像一个信号旗或交通灯。它内部有一个标志,初始状态为“红灯”(False)。一个进程可以等待这个信号灯变绿 (event.wait())。另一个进程可以在某个条件满足后,将信号灯设置为“绿灯” (event.set())。一旦信号灯变绿,所有等待它的进程都会被唤醒并继续执行
  • 用于实现一个进程向其他一个或多个进程发送信号,通知它们某个事件已经发生。非常适合用于“启动信号”或“阶段完成”的场景。
import multiprocessing
import time

def worker(event, name):
    """工作进程"""
    print(f"工作进程 {name} 已启动,正在等待启动信号...")
    event.wait() # 等待事件被设置
    print(f"工作进程 {name} 收到信号,开始工作!")
    # ... 执行实际工作 ...

if __name__ == "__main__":
    # 创建一个 Event 对象
    event = multiprocessing.Event()

    # 创建并启动工作进程
    workers = [multiprocessing.Process(target=worker, args=(event, i)) for i in range(3)]
    for w in workers:
        w.start()

    print("主进程正在进行初始化设置...")
    time.sleep(3) # 模拟初始化耗时
    print("主进程初始化完毕,发送启动信号!")
    # 这个意思就是掏出打火机点上的那个步骤
    event.set() # 设置事件,通知所有工作进程

    for w in workers:
        w.join()

    print("所有工作已完成。")

这个例子中Event的作用是通知启动,也可以阶段性终止和启动,不过这里有一个误区“Event”到底算不算进程之间的通信?

首先Event只是一个机制,作用是进程之间的通信,正常来说不涉及数据的传输。
真正的数据通信是 共享变量、传递复杂对象、共享数据结构这种操作。
这里我们看着是共享了一个变量,实际上底层进行了封装,类似下面的伪代码。

# 伪代码理解 Event 的工作原理
class Event:
    def __init__(self):
        self._flag = multiprocessing.Value('b', False)  # 共享的标志位
        self._cond = multiprocessing.Condition()        # 条件变量
    
    def set(self):
        with self._cond:
            self._flag.value = True
            self._cond.notify_all()  # 通知所有等待的进程
    
    def wait(self):
        with self._cond:
            while not self._flag.value:
                self._cond.wait()     # 等待条件满足

Semaphore: 信号量 (公共厕所——可以多个人同时拉屎,没位置就等着)

  • Semaphore 是一个更广义的锁。它内部维护一个计数器。Lock 可以看作是计数器为 1 的 Semaphore。Semaphore 就像一个有 N 把钥匙的房间(或者一个有 N 个停车位的停车场)。进程每次进入 (acquire) 需要消耗一把钥匙(计数器减 1)。如果钥匙没了(计数器为 0),进程就必须等待。进程出来后 (release) 会归还钥匙(计数器加 1)。
  • 用于控制对某一共享资源的并发访问数量。当你有一个资源池(如数据库连接池、API 调用速率限制)可以同时被有限个进程访问时,Semaphore 是完美的选择。

核心方法:

  • Semaphore(n): 初始化一个计数器为 n 的信号量。
  • sem.acquire(): 计数器减 1。如果计数器为 0,则阻塞等待。
  • sem.release(): 计数器加 1。
import multiprocessing
import time
import random

def service_request(semaphore, process_id):
    """模拟向服务发送请求"""
    with semaphore: # 自动 acquire 和 release
        # emaphore.get_value() 获取的是信号量当前的可用许可数量
        print(f"进程 {process_id} 已连接服务 (当前连接数: {2 - semaphore.get_value()}/2)")
        time.sleep(random.uniform(1, 3)) # 模拟处理时间
        print(f"进程 {process_id} 已断开服务")

if __name__ == "__main__":
    # 创建一个值为 2 的信号量,表示最多允许 2 个并发连接
    semaphore = multiprocessing.Semaphore(2)

    processes = []
    for i in range(6): # 我们有 6 个进程想要连接
        p = multiprocessing.Process(target=service_request, args=(semaphore, i))
        processes.append(p)
        p.start()
        time.sleep(0.2) # 错开启动时间,方便观察

    for p in processes:
        p.join()
        
    print("所有请求处理完毕。")

上面这个程序比较简单,只是为了理解概念,真正的使用场景如下:

import multiprocessing
import time
import random
from typing import List

class ConnectionManager:
    """连接管理器,封装信号量逻辑"""
    def __init__(self, max_connections: int):
        self.max_connections = max_connections
        self.semaphore = multiprocessing.Semaphore(max_connections)
        # 创建一个可在进程间共享的整数值
        # 'i' 表示整数类型, 0 是初始值
        self.active_connections = multiprocessing.Value('i', 0)  # 共享计数器
        
    def get_connection_info(self) -> tuple:
        """获取当前连接信息"""
        with self.semaphore._cond:  # 使用信号量的锁来保证原子性
            available = self.semaphore.get_value()
            active = self.max_connections - available
            return active, available

def service_request(conn_manager: ConnectionManager, process_id: int):
    """模拟向服务发送请求"""
    start_time = time.time()
    
    with conn_manager.semaphore:
        # 更新活跃连接数
        # 在锁保护下的代码块
        with conn_manager.active_connections.get_lock():
            conn_manager.active_connections.value += 1
        """
        等价下面的写法
        lock = conn_manager.active_connections.get_lock()
        lock.acquire()  # 获取锁
        try:
          conn_manager.active_connections.value += 1
        finally:
          lock.release()  # 释放锁
        """
        # 获取连接信息
        active_conn, available_conn = conn_manager.get_connection_info()
        wait_time = time.time() - start_time
        
        print(f"[{time.strftime('%H:%M:%S')}] 进程 {process_id} 连接成功 "
              f"(等待: {wait_time:.2f}s, 活跃: {active_conn}/{conn_manager.max_connections})")
        
        # 模拟处理时间
        process_time = random.uniform(1, 3)
        time.sleep(process_time)
        
        # 更新连接数
        with conn_manager.active_connections.get_lock():
            conn_manager.active_connections.value -= 1
        
        print(f"[{time.strftime('%H:%M:%S')}] 进程 {process_id} 处理完成 "
              f"(耗时: {process_time:.2f}s)")

def monitor_connections(conn_manager: ConnectionManager, stop_event: multiprocessing.Event):
    """监控进程,定期输出连接状态"""
    while not stop_event.is_set():
        active_conn, available_conn = conn_manager.get_connection_info()
        print(f"[监控] 当前状态: 活跃连接 {active_conn}, 可用许可 {available_conn}")
        stop_event.wait(1)  # 每秒检查一次

def main():
    # 配置参数
    MAX_CONNECTIONS = 2
    TOTAL_PROCESSES = 6
    START_DELAY = 0.3  # 进程启动间隔
    
    print(f"🚀 启动连接服务 (最大并发: {MAX_CONNECTIONS}, 总进程数: {TOTAL_PROCESSES})")
    print("=" * 50)
    
    # 创建连接管理器和停止事件
    conn_manager = ConnectionManager(MAX_CONNECTIONS)
    stop_event = multiprocessing.Event()
    
    # 启动监控进程
    monitor_process = multiprocessing.Process(
        target=monitor_connections, 
        args=(conn_manager, stop_event)
    )
    monitor_process.start()
    
    # 创建工作进程
    processes: List[multiprocessing.Process] = []
    try:
        for i in range(TOTAL_PROCESSES):
            p = multiprocessing.Process(
                target=service_request, 
                args=(conn_manager, i)
            )
            processes.append(p)
            p.start()
            time.sleep(START_DELAY)  # 错开启动时间
        
        # 等待所有工作进程完成
        for p in processes:
            p.join()
            
    except KeyboardInterrupt:
        print("\n⚠️  收到中断信号,正在停止进程...")
    finally:
        # 停止监控
        stop_event.set()
        monitor_process.join()
        
        # 确保所有进程都结束
        for p in processes:
            if p.is_alive():
                p.terminate()
    
    print("=" * 50)
    print("✅ 所有请求处理完毕")

if __name__ == "__main__":
    main()

这里的监控实现的很巧妙,使用了事件驱动机制,轮询检查Event的状态,如果为False就停止监控,核心使用wait方法阻塞一秒,达到类似定时器的效果,这样的好处是能随时被外界的set方法唤醒监控,也就是有人上厕所会被立马监控,表示没有坑了。

3. 深入多线程:threading 模块

Python的threading模块提供了一套高级接口来创建和管理线程,实现并发执行。然而,由于全局解释器锁(GIL)的存在,CPython中的多线程在CPU密集型任务上无法实现真正的并行(即无法同时利用多核CPU),但它在I/O密集型任务中表现出色,能显著提升程序效率。其线程管理和同步工具(如Lock, Event)在概念上与Java中的Thread, synchronized, Lock, wait/notify等机制非常相似,但底层的执行模型因GIL的存在而有本质区别。

3.1. 全局解释器锁 (GIL - Global Interpreter Lock)

  • 什么是 GIL:CPython 解释器中的一个互斥锁,确保任何时候只有一个线程在执行 Python 字节码
  • GIL 的影响:导致 Python 多线程在 CPU 密集型任务上无法实现并行,但对于 I/O 密集型任务,它会在线程等待 I/O 时释放,从而实现并发。

3.2. 创建和管理线程

  • threading.Thread 类的使用 (与 Process 类似)
    • target, args, kwargs
    • t.start(), t.join()
  • threading.current_thread(): 获取当前线程对象。
  • 守护线程 (daemon=True): 主线程结束时,守护线程随之强制退出。

线程的基本使用

import threading
import time

def worker(name, duration):
    print(f'线程 {name}: 开始')
    time.sleep(duration)
    print(f'线程 {name}: 结束')

# 创建线程
t1 = threading.Thread(target=worker, args=('A', 2), kwargs={}) 
t2 = threading.Thread(target=worker, name='WorkerB', args=('B', 3)) # 也可以通过name参数命名

# 启动线程
t1.start()
t2.start()

# 获取当前线程
main_thread = threading.current_thread()
print(f'主线程 {main_thread.name} 正在运行')

# 等待子线程结束
t1.join()
t2.join()

print('所有线程已结束')

守护线程的作用

t_daemon = threading.Thread(target=worker, args=('Daemon', 5), daemon=True)
t_daemon.start() 
# 主线程不会等待t_daemon,会直接在 "所有线程已结束" 后退出

守护线程 (daemon=True):如果一个线程被设置为守护线程,那么当主线程(以及所有非守护线程)执行完毕后,程序会直接退出,而不会等待守护线程执行完毕。这适用于执行一些无关紧要的后台任务,如心跳检测、日志记录等

多线程在IO密集型任务的表现(实际例子)

任务场景:下载多个网页的内容,使用单线程和多线程。

原理:正常单线程下执行IO任务时会被阻塞(GIL锁开启),如果使用多线程,当我们发起IO请求之后GIL锁会被释放,我们继续执行下一个任务,IO任务自动运行。

单线程版本

import requests
import time
import threading

# 要下载的URL列表
URLS = [
    'https://www.python.org/',
    'https://www.wikipedia.org/',
    'https://github.com/',
    'https://www.youtube.com/',
    'https://www.amazon.com/'
]

def download_url(url):
    """下载单个URL的内容并打印其大小"""
    try:
        start_time = time.time()
        response = requests.get(url, timeout=10)
        # 获取当前线程名
        thread_name = threading.current_thread().name
        elapsed_time = time.time() - start_time
        print(f"[{thread_name}] 下载 {url} 完成,大小: {len(response.content)}字节, 耗时: {elapsed_time:.2f}秒")
    except requests.RequestException as e:
        print(f"下载 {url} 失败: {e}")

# --- 单线程执行 ---
print("--- 开始单线程下载 ---")
single_thread_start_time = time.time()
for url in URLS:
    download_url(url)
single_thread_duration = time.time() - single_thread_start_time
print(f"--- 单线程下载总耗时: {single_thread_duration:.2f}秒 ---\n")

多线程版本(普通实现,不用线程池)

# --- 多线程执行 ---
print("--- 开始多线程下载 ---")
threads = []
multi_thread_start_time = time.time()

for url in URLS:
    # 为每个URL创建一个线程
    thread = threading.Thread(target=download_url, args=(url,))
    threads.append(thread)
    # 启动线程
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

multi_thread_duration = time.time() - multi_thread_start_time
print(f"--- 多线程下载总耗时: {multi_thread_duration:.2f}秒 ---")

多线程版本(使用线程池)

from concurrent.futures import ThreadPoolExecutor
import requests
import time

# ... download_url 和 URLS 定义同上 ...

print("--- 开始使用线程池下载 ---")
executor_start_time = time.time()
# 创建一个最多包含5个线程的线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    # map方法会自动为URLS中的每个元素启动一个线程,并按输入顺序返回结果
    executor.map(download_url, URLS)
executor_duration = time.time() - executor_start_time
print(f"--- 线程池下载总耗时: {executor_duration:.2f}秒 ---")

核心原理解析

💡 这里我们一定要抓住一个点,GIL锁的释放和开启永远都是CPU密集型任务来控制的,也就是非response = requests.get(url, timeout=10)这一段代码的执行,只有关于IO操作的才会释放GIL锁,从而执行另外一个任务的非IO型代码,依此类推。

💡 也就是说,排除IO任务外,其余代码都是串行执行的,严格意义上来说并不是真正的并发,只是将最耗时的操作进行了并发。

思考:如果并发量实在太大,成千上万,我们还会使用线程池来管理吗?

💡 答案是不会,这种情况创建线程就有些浪费资源了,本身Thread就是会耗费一部分资源,如果并发量太大就显得不好管理,这时候使用异步I/O (asyncio)就很好了,实际上就是异步操作,然后使用回调函数处理结果,这样更能节省资源。

再思考:如果我的非IO任务同样耗时,这时候使用线程还会有优势吗?

💡 答案是不会,如果你的IO密集型任务和CPU密集型任务同样都很耗时,那就需要考虑线程和进程的结合了,可以将多个任务进行分割,比如10000个任务,我们按照200一个批次开一个进程,在这个进程中开启线程处理IO任务,这样计算下来就是最优的结果。

import time
import requests
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

# 假设这是一个非常耗时的CPU密集型任务
def cpu_intensive_analysis(content):
    """模拟复杂的文本分析"""
    # 这里用一个简单的循环来消耗CPU时间
    # 实际应用中可能是复杂的JSON解析、NLP、图像处理等
    total = sum(i for i in range(10**6)) 
    return len(content) + total

# 每个进程中的线程要执行的任务
def download_and_analyze(url):
    """下载并分析单个URL (I/O + CPU)"""
    try:
        response = requests.get(url, timeout=10)
        # CPU密集部分
        result = cpu_intensive_analysis(response.content)
        print(f"处理完成: {url}, 结果: {result}")
        return result
    except requests.RequestException as e:
        print(f"处理失败: {url}, 错误: {e}")
        return None

# 每个进程要执行的主函数
def process_worker(urls_chunk):
    """
    这个函数在单个进程中运行。
    它使用线程池来并发处理分配给它的URL块。
    """
    results = []
    # 在进程内部创建线程池
    with ThreadPoolExecutor(max_workers=10) as thread_executor:
        # 使用map并发执行下载和分析
        results = list(thread_executor.map(download_and_analyze, urls_chunk))
    return results

if __name__ == "__main__":
    URLS = [
        'https://www.python.org/', 'https://www.wikipedia.org/', 'https://github.com/',
        'https://www.youtube.com/', 'https://www.amazon.com/', 'https://www.apple.com/',
        'https://www.microsoft.com/', 'https://www.google.com/'
    ] * 5 # 模拟40个任务

    NUM_PROCESSES = 4 # 假设我们有4核CPU
    
    # 将URL列表分割成多个块,每个块交给一个进程处理
    chunk_size = (len(URLS) + NUM_PROCESSES - 1) // NUM_PROCESSES
    url_chunks = [URLS[i:i + chunk_size] for i in range(0, len(URLS), chunk_size)]

    start_time = time.time()
    
    final_results = []
    # 创建进程池
    with ProcessPoolExecutor(max_workers=NUM_PROCESSES) as process_executor:
        # 将每个URL块提交给进程池中的一个进程
        # process_worker函数将在每个子进程中被调用
        futures = process_executor.map(process_worker, url_chunks)
        for result_chunk in futures:
            final_results.extend(result_chunk)
            
    duration = time.time() - start_time
    print(f"\n任务完成!总共处理了 {len(final_results)} 个URL。")
    print(f"总耗时: {duration:.2f} 秒")

3.3. 线程同步与数据共享

  • 共享数据的问题:竞态条件 (Race Condition)。
  • threading.Lock: 互斥锁,解决竞态条件。
  • threading.RLock: 可重入锁,允许同一线程多次获取锁。
  • threading.Event: 线程间的信号通信。
  • threading.Condition: 条件变量,复杂的生产者-消费者模型。
  • threading.Semaphore: 控制并发线程数量。
  • threading.Barrier: “栅栏”,让多个线程等待,直到所有线程都到达某一点。

竞态条件

💡 当多个线程并发地访问和修改同一个共享数据时,最终的结果取决于线程执行的精确时序,导致结果不可预测,虽然这个概念我们都知道,但实际为什么在代码层面会不一样,其实底层是操作系统的线程抢占

场景:银行账户余额。我们模拟1000个人,每个人同时往同一个账户里存1元钱。初始余额为0,最终余额理应是1000元。

错误代码

import threading
import time

class BankAccount:
    def __init__(self):
        self.balance = 0

def deposit(account, amount):
    """存款操作"""
    # 1. 读取当前余额
    current_balance = account.balance
    # 2. 模拟网络延迟或CPU计算
    time.sleep(0.0001) 
    # 3. 计算新余额
    new_balance = current_balance + amount
    # 4. 写入新余额
    account.balance = new_balance

def run_simulation(account):
    threads = []
    for _ in range(1000):
        # 每个线程都存1元
        thread = threading.Thread(target=deposit, args=(account, 1))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

# --- 运行模拟 ---
account = BankAccount()
run_simulation(account)
print(f"预期余额: 1000, 实际余额: {account.balance}")

💡 根本原因:操作原子性被破坏!!!

account.balance += 1 这个操作在Python中看起来是一行代码,但它在底层不是原子操作。它被分解成了三个步骤:

  1. 读取 (Read):获取 account.balance 的当前值。
  2. 修改 (Modify):在CPU寄存器中将该值加1。
  3. 写入 (Write):将计算出的新值写回 account.balance。

由于线程调度是抢占式的,操作系统可以在这三个步骤之间的任何时刻暂停一个线程,切换到另一个线程。这就导致了“数据覆盖”的灾难:

T1 (线程1) 读取余额,值为 50。
【上下文切换】 T1被暂停,T2开始运行。
T2 (线程2) 读取余额,值也是 50。
T2 计算新余额为 51。
T2 将 51 写入 account.balance。账户余额现在是51。
【上下文切换】 T2完成,T1恢复运行。
T1 完全不知道T2的存在,它继续从自己上次中断的地方开始。它当时读取的值是50,所以它计算出的新余额也是51。
T1 将 51 写入 account.balance。

解决竞态条件的6种方案

方案一:threading.Lock (互斥锁)

Lock是最基础的同步原语,它的哲学是一次只许一人进入

使用互斥锁解决上述的银行问题

class BankAccountWithLock:
    def __init__(self):
        self.balance = 0
        self.lock = threading.Lock()

    def deposit(self):
        with self.lock: # 自动获取和释放锁
            # === 临界区开始 ===
            current_balance = self.balance
            self.balance = current_balance + 1
            # === 临界区结束 ===
  • 这是解决该类问题的最优方案,本身我们存款的这个变量在同一时间(代码层面)就只能有一个人操作,完全保证了操作的原子性。
方案二:threading.RLock (可重入锁)

💡 RLock (Re-entrant Lock) 允许同一个线程多次获取同一个锁而不会造成死锁

使用可重入锁解决银行问题:

# 代码与Lock版本完全相同,只是将 threading.Lock() 换成 threading.RLock()
class BankAccountWithRLock:
    def __init__(self):
        self.balance = 0
        self.lock = threading.RLock() # 使用RLock

    def deposit(self):
        with self.lock:
            current_balance = self.balance
            self.balance = current_balance + 1
  • 用法基本一致,只是更换了锁的类型。
  • 但没必要,因为我们不需要一个人多次获取同一个锁,反而会造成额外开销。
  • RLock最佳应用场景为:当一个函数或方法内部,需要调用另一个同样需要加锁的函数或方法时

银行场景升级:在存款的时候需要验证你当日存款的次数是否达到上限或其它需要校验的场景。

class ComplicatedAccount:
    def __init__(self):
        self.balance = 0
        self.lock = threading.RLock()

    def _internal_check(self):
        with self.lock: # 第二次获取锁
            # ... 做一些内部检查 ...
            pass

    def deposit_with_check(self, amount):
        with self.lock: # 第一次获取锁
            self._internal_check() # 如果这是普通Lock,这里会永远等待自己释放锁,造成死锁
            self.balance += amount
方案三:threading.Semaphore (信号量)

💡 Semaphore 的核心是一个管理计数器的锁,用于控制能同时访问特定资源的线程数量

# 我们需要将信号量的初始值设为1,来模拟Lock的行为
semaphore = threading.Semaphore(1)

def deposit_with_semaphore(account):
    with semaphore:
        current_balance = account.balance
        account.balance = current_balance + 1

虽然但是这个方案可以解决银行的问题,但这其实是邪修,从可读性来说就有些抽象,为什么要限制信号量为1,因为本身就是限制线程数量的,他们还以为只能有一个人访问呢。

  • Semaphore 最佳场景:限制对有限资源的访问,比如数据库的连接池和API的调用次数。
# 假设我们的数据库最多只支持5个并发连接
db_connection_pool_semaphore = threading.Semaphore(5)

def query_database():
    with db_connection_pool_semaphore:
        # 这里最多只会有5个线程同时执行
        conn = get_db_connection()
        # ... 执行查询 ...
        release_db_connection(conn)

当然实际上并发量肯定不会这么小,但总归有一个限制,因为数据库属于最核心的资源,它一旦宕机就意味着系统挂掉了,虽然运维有备份数据库的操作,频率一般都是一周或者3天,也不会说宕机后直接切换到另一台数据库,顶多会切换到一台保持系统正常运行的数据库,里面存储的都是你的登录信息、用户配置等等,保证你能正常登录系统,但是其余的业务场景都会禁止你访问。

而且我们经常访问有些网站时会出现系统繁忙稍后访问的情况,原理就是类似上面的信号量,它限制同一时间不能出现太多用户挤进去,保证系统的健康运行。

⚠️ 接下来的几种方案都不适合解决上面的银行问题!!!

方案四:threading.Event (事件)

💡 这是一个简单的信号标识,用于暂停和唤醒线程,而对象是所有的线程,并不适合上述的银行场景,具体上面我们说过,这里简单例子说明。

event = threading.Event()

def worker():
    event.wait() # 所有worker在此等待发令枪响
    print("开始赛跑!")

# 主线程准备...
time.sleep(2)
event.set() # 发令!
方案五:threading.Condition (条件变量)——重点

💡 这是一个带有条件的Lock锁,可以通过条件释放和等待线程,是生产者-消费者常用的手段,非常经典。

  • 例子中有2个生产和消费者,同时启动。
  • 生产和消费的逻辑是单独的。
  • 设置了缓冲区最大数量,防止生产者唤醒了另一个生成者,导致生产数量过剩。
import threading
import time
import random

# 共享资源
items = []
MAX_SIZE = 5  # 缓冲区最大容量
condition = threading.Condition()  # 默认内部隐含创建一个 Lock

def producer(name):
    """生产者:生产数据放入列表"""
    while True:
        with condition:
            # 1. 检查是否满足生产条件(缓冲区不能满)
            # 必须使用 while 而不是 if,防止虚假唤醒
            while len(items) >= MAX_SIZE:
                print(f"🔴 {name}: 缓冲区满 ({len(items)}), 等待空位...")
                condition.wait()  # 释放锁并挂起,等待被唤醒
            
            # 2. 生产数据
            item = random.randint(1, 100)
            items.append(item)
            print(f"🟢 {name}: 生产了 {item} \t目前队列: {items}")
            
            # 3. 唤醒等待的线程(通常是消费者,但也可能是其他生产者)
            condition.notify_all()
        
        # 模拟生产耗时 (放在锁外面,提高并发效率)
        time.sleep(random.uniform(0.5, 1.5))

def consumer(name):
    """消费者:从列表取出数据"""
    while True:
        with condition:
            # 1. 检查是否满足消费条件(缓冲区不能空)
            while not items:
                print(f"🟡 {name}: 缓冲区空, 等待数据...")
                condition.wait() # 释放锁并挂起
            
            # 2. 消费数据
            item = items.pop(0)
            print(f"🔵 {name}: 消费了 {item} \t目前队列: {items}")
            
            # 3. 唤醒等待的线程(通常是生产者)
            condition.notify_all()
        
        # 模拟消费耗时
        time.sleep(random.uniform(0.8, 1.8))

if __name__ == "__main__":
    # 启动 2 个生产者和 2 个消费者
    p1 = threading.Thread(target=producer, args=("生产者A",))
    p2 = threading.Thread(target=producer, args=("生产者B",))
    c1 = threading.Thread(target=consumer, args=("消费者X",))
    c2 = threading.Thread(target=consumer, args=("消费者Y",))

    p1.start()
    p2.start()
    c1.start()
    c2.start()

仔细阅读这个代码,和一般例子中的单生产 / 消费者有很大区别,首先condition自带Rlock锁,如果你是单生产消费模式可以手动传入一个Lock锁,比起默认的Rlock效率较快。

同时我们设置了最大缓冲区,防止了生产过剩,因为线程是抢占式的,有可能消费者的下一个线程还是消费者,生产者下一个线程还是生产者。

因为不是但消费生产模式,我们使用了notify_all的方法,通知了所有线程,保证了公平性,让所有空闲的线程抢占锁,如果是单生产消费模式直接使用notify 方法即可。

在消费者判断的时候必须使用while not items 的while写法,不能使用if,因为可能存在线程自动唤醒(情况较少),或者说我们这里是通过notifyy_all的方法,有可能同时唤醒了两个消费者,但是其中一个消费者抢到了,然后消费了一个,此时列表为空,如果是if判断,因为在那一时刻列表不为空,此时另外的线程就会判断错误,直接消费,但是没有数据,就会报错。

进阶的写法可以将while替换为wait_for(predicate),内部自己会进行while循环,也是常用的手段。

def consumer():
    with condition:
        # 等待直到 items 不为空
        # wait_for 内部实现了 while not predicate(): wait()
        condition.wait_for(lambda: len(items) > 0) 
        
        item = items.pop(0)
        print(f"消费: {item}")
        condition.notify_all()
方案六:queue.Queue (线程安全队列)——重点
  • 先看代码,代码比较简单,我们重点理解它的思路
import queue

# 任务队列,存放要执行的操作
task_queue = queue.Queue()

# 银行家线程,是唯一一个能接触到账户余额的线程
def banker(account):
    while True:
        # get()是阻塞的,并且是原子的
        task = task_queue.get() 
        if task is None: # 结束信号
            break
        # 因为只有这一个线程在操作,所以永远不会有竞态条件
        account.balance += 1 
        task_queue.task_done()

# --- 主流程 ---
account = BankAccount()
banker_thread = threading.Thread(target=banker, args=(account,))
banker_thread.start()

# 1000个客户线程(生产者)
for _ in range(1000):
    task_queue.put("deposit_1_dollar") # 往队列里放任务

task_queue.join() # 等待队列中所有任务被处理完毕
task_queue.put(None) # 发送结束信号
banker_thread.join()

print(f"最终余额: {account.balance}") # 结果永远是1000

队列其实和Lock差不多,将核心操作使用了一个列表进行排队执行,这样也能保证操作的原子性。

那有人就问了,为什么还需要队列的操作,从本质上来说这都是在同一时间进行了一个操作,其实都是顺序执行的?

  • 首先我们一定要遵循一个设计原则,耗时的操作尽量放在生产者里面,因为生产者是并发的,消费者是串行的。
  • 如果真的在消费者需要耗时操作,那就要考虑增加进程,增加消费者,让消费者的耗时操作并行,但核心的存取操作依旧是需要加锁的
  • 这样可以保证程序的执行速度,但并不是队列最大的优势,其最大的优势是:“削峰填谷”。

削峰填谷的核心是什么,如果你一次性进入了10000个客户,如果你一味地加锁,那会导致CPU的计算量瞬间达到一个峰值,然后系统啪的一声死掉了,而队列就不会,削峰意思就是减少同一时间的并发量,保证系统的健康运行,填谷相当于将时间增大了,队列会保证1s内执行100个,但这样会增加处理延迟。

  • Lock模式(同步等待)
    客户 A 要存钱。如果锁被占用了,客户 A 必须在原地死等,什么也干不了。如果存钱操作很慢,客户 A 的整个线程就卡死在那里了。
  • Queue模式(异步提交)
    客户 A 把存钱请求往 task_queue 里一扔(put 动作通常极快),立马就可以转身去做别的事(比如去处理下一个客户的请求,或者进行复杂的计算)。他不需要等待 Banker 真的把钱存进去。

3.4. 线程局部数据

  • threading.local: 为每个线程创建独立的存储空间。

💡 不要小瞧这个知识点,虽然使用起来简单,但是从设计模式上来说非常高级,先来看一个普通全局变量和线程局部数据的区别。

  • 普通全局变量:就像酒店大堂的留言板。线程 A 刚写上“我是张三”,线程 B 过来把它擦掉写上“我是李四”。线程 A 回头一看,懵了:“我是李四?”(这就是竞态条件,数据乱了)。
  • threading.local:就像酒店的房间。
    • 虽然大家都拿着房卡去开 “101号对象” 的门。
    • 但是,线程 A 的房卡打开的是 A 专属的 101 房间。
    • 线程 B 的房卡打开的是 B 专属的 101 房间。
    • A 在房间里放了一瓶水,B 进自己的房间是看不到这瓶水的。

场景:记录当前处理任务的“用户名”

import threading
import time

# === 1. 创建一个 local 对象 ===
# 注意:它是全局定义的,但属性是隔离的
ctx = threading.local()

def process_user_request(user_name):
    # === 2. 在当前线程中绑定属性 ===
    # 看起来是修改同一个全局对象,其实只是修改了当前线程的副本
    ctx.user = user_name
    
    print(f"[{user_name}] 线程开始处理,存入 ctx.user = {ctx.user}")
    
    # 模拟耗时操作,导致线程切换
    time.sleep(1)
    
    # === 3. 读取属性 ===
    # 这里的 ctx.user 依然是当前线程之前存进去的值,不会被其他线程覆盖
    print(f"[{user_name}] 处理完成,读取 ctx.user = {ctx.user}")

# 启动两个线程
t1 = threading.Thread(target=process_user_request, args=("张三",))
t2 = threading.Thread(target=process_user_request, args=("李四",))

t1.start()
t2.start()

t1.join()
t2.join()
  • 作用:实现数据的线程隔离。每个线程都有自己独立的数据副本。
  • 优势
    • 无需加锁:因为数据不共享,不存在竞争。
    • 解决传参麻烦:也就是所谓的“上下文(Context)管理”。
  • 经典实战
    • Flask / Django:当你写 Web 后端时,不同用户的请求由不同线程处理。你在代码里任意地方 import request 对象,就能拿到当前请求的信息,而不会拿到别人的请求信息。这就是通过类似 threading.local 的机制实现的。
    • 数据库连接:每个线程应该使用自己独立的数据库连接(Connection),而不是共享同一个。通常数据库连接池会把连接绑定到 threading.local 上。

4. 核心工具:队列 (queue 模块)

4.1. 队列的角色

  • 线程安全 (Thread Safety):你不再需要手动使用 Lock 来保护列表。队列内部已经实现了锁机制,多个线程同时存取不会导致数据错乱。
  • 解耦 (Decoupling):生产者不需要知道消费者是谁,也不需要等待消费者处理完,它只管把任务丢进管道
  • 缓冲 (Buffering):当流量激增时,队列充当蓄水池,保护下游消费者不被压垮。

4.2. 核心方法

  • q.put(item): 放入元素 (阻塞)。
  • q.get(): 取出元素 (阻塞)。
  • q.qsize(): 返回队列大小。
  • q.empty(): 判断队列是否为空。
  • q.full(): 判断队列是否已满。
  • 非阻塞操作: put_nowait(), get_nowait()

queue 的核心在于阻塞 (Blocking) 机制。理解这一点,你就能控制线程是该“挂起等待”还是“立即报错”。

1. 阻塞式存取(最常用)

  • q.put(item, block=True, timeout=None):
    • 逻辑:把数据放进队列。如果队列满了(设置了 maxsize),当前线程会卡在这里(挂起),直到有空位。
    • Timeout:如果设置了 timeout=3,3秒后还没空位,抛出 queue.Full 异常。
  • q.get(block=True, timeout=None):
    • 逻辑:从队列取出数据。如果队列空了,当前线程会卡在这里(挂起),直到有新数据进来。
    • Timeout:如果设置了 timeout=3,3秒后还没数据,抛出 queue.Empty 异常。

2. 非阻塞式存取(即刻返回)

  • q.put_nowait(item) / q.get_nowait():
  • 逻辑:完全不等。如果队列满了(放不进去)或空了(取不到),立刻抛出异常。
  • 等价于:block=False 的调用方式。

3. 状态检查(⚠️ 有并发陷阱)

  • q.qsize(): 返回队列大小。
  • q.empty() / q.full(): 返回是否为空/满。

注意:在多线程下结果不可靠。

  • 错误写法:if not q.empty(): q.get()
  • 原因:判断不为空的那一瞬间,可能另一个线程刚好把最后一个数据取走了,导致紧接着的 get() 报错或阻塞。
  • 正确写法:直接 q.get() 并捕获 Empty 异常。

4. 任务跟踪(优雅退出的关键)

  • q.task_done(): 消费者告诉队列:“我刚取出的这个任务已经处理完毕了”。
  • q.join(): 主线程调用此方法会阻塞,直到队列中所有被 put 进去的任务都已经被 get 并且都执行了 task_done()。

4.3. queue 模块的基本使用

  • queue.Queue: 先进先出 (FIFO) 队列。
  • queue.LifoQueue: 后进先出 (LIFO) 队列。
  • queue.PriorityQueue: 优先级队列。

1. queue.Queue (先进先出 FIFO)

💡 最常用的队列。就像排队,先来的先服务。

简单用法

q = queue.Queue()
q.put(1); q.put(2)
print(q.get()) # 结果: 1

真实场景:日志聚合器

import logging

log_queue = queue.Queue()

def logger_thread():
    """专门写文件的线程"""
    while True:
        record = log_queue.get()
        if record is None: break # 退出信号
        # 模拟写文件操作 (串行写入,绝对安全)
        with open("app.log", "a") as f:
            f.write(f"[LOG] {record}\n")
        log_queue.task_done()

# 启动日志服务
threading.Thread(target=logger_thread, daemon=True).start()

# 模拟:多个业务线程并发产生日志
def business_logic(user_id):
    # 业务代码...
    # 只需要把日志丢进队列,极快,不阻塞业务
    log_queue.put(f"User {user_id} logged in")

# 模拟并发
for i in range(5):
    threading.Thread(target=business_logic, args=(i,)).start()

2. queue.LifoQueue (后进先出 LIFO)

简单用法

q = queue.LifoQueue()
q.put("操作1"); q.put("操作2")
print(q.get()) # 结果: 操作2

真实场景:文本编辑器的“撤销”功能 (Undo)

history_queue = queue.LifoQueue()

def user_type(text):
    print(f"用户输入: {text}")
    history_queue.put(text) # 存入历史

def undo_manager():
    print("\n--- 用户触发撤销(Ctrl+Z) ---")
    try:
        # 获取最近一次的操作
        last_action = history_queue.get(block=False)
        print(f"撤销了: '{last_action}'")
    except queue.Empty:
        print("没有可撤销的操作")

# 模拟操作流
user_type("Hello")
user_type("World")
user_type("Python")

# 现在队列是: [Hello, World, Python]
undo_manager() # 应该撤销 Python
undo_manager() # 应该撤销 World

queue.PriorityQueue (优先级队列)

💡 元素自带权重,权重越低(数字越小),优先级越高。

简单用法:

q = queue.PriorityQueue()
q.put((10, "普通任务")); q.put((1, "紧急任务"))
# 虽然紧急任务后放进去,但 get 出来的是它
print(q.get()) # 结果: (1, '紧急任务')

真实场景:VIP 客服排队系统

request_queue = queue.PriorityQueue()

def server_worker():
    while True:
        # 取出的永远是当前优先级最高的
        priority, user_name = request_queue.get()
        print(f"正在服务: {user_name} (等级: {priority})")
        time.sleep(0.5)
        request_queue.task_done()

threading.Thread(target=server_worker, daemon=True).start()

# 模拟请求到达:普通用户先到
request_queue.put((10, "普通用户 小张"))
request_queue.put((10, "普通用户 小李"))

# 突然来了个 VIP,虽然他最后来,但因为优先级是 1
print(">>> VIP 用户到达 <<<")
request_queue.put((1, "VIP 王老板"))

request_queue.join()
# 输出顺序保证:VIP 王老板 -> 小张 -> 小李

5. 现代并发库:concurrent.futures

💡 Futures无论是Python还是Java,都是后面新出的多线程管理工具,极大地简化了我们对于多线程的使用,并且提高了效率,排除特殊情况外,这是我们后期使用的主力军。

5.1. 更高级的抽象

  • Executor 接口:提供统一的异步任务执行模型。

💡 Executor 是 concurrent.futures 模块的核心抽象基类。它的核心思想是将“任务的提交”(Producer)与“任务的执行”(Consumer)解耦,而且其实现了上下文管理协议(Context manager),我们可以直接使用with语句自动关闭,相当的丝滑。

简单一句话,你只需要任务的创建,其余生命周期由它自己调用。

from concurrent.futures import ThreadPoolExecutor

def task(name):
    print(f"Hello, {name}")

# 使用 with 管理资源,无需手动 shutdown
with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(task, "Python")
    executor.submit(task, "Java")

Executor 有两个具体的实现类,选择哪一个取决于你的任务类型,分别对应线程池和进程池。

5.2. ThreadPoolExecutor

  • 适用场景:用于 I/O 密集型任务的线程池。(如:爬虫、API请求、数据库读写)
  • 底层原理:复用线程。
  • 关键限制 (GIL):由于 Python 全局解释器锁 (GIL) 的存在,同一时刻只能有一个线程在 CPU 上执行 Python 字节码。因此,它无法利用多核 CPU 进行并行计算。但在等待 I/O 时,GIL 会释放,其他线程可以运行。
  • 核心方法
    • executor.submit(fn, *args, **kwargs): 提交任务,返回 Future 对象。
    • executor.map(fn, *iterables): 高效地对可迭代对象应用函数。
from concurrent.futures import ThreadPoolExecutor
import time
import threading

def io_bound_task(n):
    # 模拟 I/O 等待,此时释放 GIL
    time.sleep(1)
    return f"I/O 任务 {n} 完成,线程ID: {threading.get_ident()}"

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(io_bound_task, 1)
    print(future.result())

5.3. ProcessPoolExecutor

  • 适用场景:CPU 密集型任务(如:图像处理、复杂数学运算、机器学习推理)。
  • 底层原理:创建多个独立的 Python 进程。
  • 突破 GIL:每个进程有自己独立的解释器和内存空间(以及独立的 GIL)。因此,它可以利用多核 CPU 实现真正的并行。
from concurrent.futures import ProcessPoolExecutor
import os

def cpu_bound_task(n):
    # 纯计算任务
    result = sum(i * i for i in range(n))
    return f"CPU 任务完成,进程ID: {os.getpid()}"

# Windows 下必须放在 if __name__ == '__main__': 之下
if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(cpu_bound_task, 1000000)
        print(future.result())

注意⚠️ :在Java的多线程执行中,如果涉及线程间通信,那效率是要比Python快的,是因为Java堆内存的存在,而Python中需要依赖IPC和Pickle序列化,开销比较大

5.4. Future 对象

💡 当你调用 executor.submit() 时,它不会立即执行完成,而是立即返回一个 Future 对象。这是一张“期票”,代表未来某个时刻会产生的结果,和智能电饭煲一样,米饭蒸好了直接自动开启保温模式,你根本不用管。

  • 核心方法
    • future.result(): 获取任务返回值 (阻塞)。
    • future.done(): 判断任务是否完成。
    • future.add_done_callback(fn): 任务完成后执行回调函数。
from concurrent.futures import ThreadPoolExecutor
import time

def long_running_task():
    time.sleep(2)
    return 42

def on_complete(future):
    print(f"【回调】任务结束,结果是: {future.result()}")

with ThreadPoolExecutor() as executor:
    # 1. 提交任务,获得 Future
    future = executor.submit(long_running_task)
    
    # 2. 注册回调 (非阻塞)
    future.add_done_callback(on_complete)
    
    # 3. 可以在这里做其他事
    print("主线程继续运行...")
    
    # 4. 如果需要,可以阻塞等待结果
    # print(future.result())

5.5 批量处理和控制

map vs as_completed

  • executor.map(func, iterables):
    • 类似内置 map,但是并发执行
    • 关键点:结果返回的顺序严格对应输入参数的顺序。如果第一个任务很慢,即使第二个任务先做完,迭代器也会阻塞等待第一个。
  • as_completed(futures):
    • 这是一个生成器,谁先做完,谁先返回
    • 适合处理那些不关心结果顺序,只关心处理速度的场景。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def task(n):
    sleep_time = random.uniform(0.5, 2.0)
    time.sleep(sleep_time)
    return f"任务 {n} (耗时 {sleep_time:.2f}s)"

with ThreadPoolExecutor() as executor:
    # 批量提交
    futures = [executor.submit(task, i) for i in range(3)]
    
    print("开始获取结果(谁先完成显示谁):")
    # 使用 as_completed 处理乱序结果
    for f in as_completed(futures):
        print(f.result())

wait (同步栅栏)

💡 concurrent.futures.wait 是一个非常实用的函数,它允许你对一组 Future 对象进行细粒度的流程控制。它在功能上与 Java 的 CountDownLatch 或 CompletableFuture.allOf/anyOf 非常相似。

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
  • fs: 一个包含 Future 对象的列表(或集合)。
  • return_when: 决定函数何时返回(取消阻塞),有三种枚举值:
    • ALL_COMPLETED (默认): 所有任务都结束(完成或取消)才返回。
    • FIRST_COMPLETED: 只要有一个任务结束就返回。
    • FIRST_EXCEPTION: 只要有一个任务抛出异常就返回(如果没有异常,就等所有任务完成)。
  • 返回值: 返回一个元组 (done, not_done),分别包含已完成的 Future 集合和未完成的 Future 集合。
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
import random

def task(name, duration):
    time.sleep(duration)
    return f"Task {name} finished in {duration}s"

# 创建线程池
executor = ThreadPoolExecutor(max_workers=5)

# 准备一组任务
futures = []
print("--- 提交任务 ---")
# 模拟不同耗时的任务
for i in range(3):
    # 随机耗时 1~3秒
    duration = random.randint(1, 3)
    f = executor.submit(task, f"Worker-{i}", duration)
    futures.append(f)

# ---------------------------------------------------------
# 场景 1: 等待所有任务完成 (类似 Java CountDownLatch / CompletableFuture.allOf)
# ---------------------------------------------------------
print(f"Main Thread: Waiting for ALL tasks...")
done, not_done = wait(futures, return_when=ALL_COMPLETED)

print(f"Main Thread: All done. Completed count: {len(done)}")
for f in done:
    print(f" -> {f.result()}")

print("-" * 30)

# ---------------------------------------------------------
# 场景 2: 等待第一个完成 (类似 Java CompletableFuture.anyOf)
# ---------------------------------------------------------
# 再次提交一组新任务演示 FIRST_COMPLETED
race_futures = [executor.submit(task, f"Racer-{i}", random.uniform(0.5, 2.0)) for i in range(3)]

print(f"Main Thread: Waiting for the FIRST task to finish...")
# 阻塞直到任意一个任务完成
done, not_done = wait(race_futures, return_when=FIRST_COMPLETED)

# 此时 done 集合里至少有一个 future
first_result = list(done)[0].result()
print(f"Main Thread: We have a winner! -> {first_result}")

# 注意:未完成的任务仍在后台运行,wait 不会自动取消它们
print(f"Still running tasks: {len(not_done)}")

# 可以在这里取消剩下的任务 (best practice)
for f in not_done:
    f.cancel()

executor.shutdown()

在Java中,CompletableFuture.allOf() 或者 CountDownLatch 对应Python的 ALL_COMPLETEDFIRST_COMPLETED 对应 CompletableFuture.anyOf() 或者 ExecutorService.invokeAny()

6. 对比、选择与最佳实践

6.1. 进程 vs. 线程:一张图看懂

特性 Python ThreadPoolExecutor Python ProcessPoolExecutor Java ExecutorService (Thread Pool)
核心机制 线程 (Thread) 进程 (Process) 线程 (Native Thread)
并行能力 伪并行 (受 GIL 限制) 真并行 (多核 CPU) 真并行 (多核 CPU)
内存开销 低 (共享内存) 高 (独立内存复制) 低 (共享内存)
数据通信 简单 (共享对象) 复杂 (Pickle 序列化) 简单 (共享对象)
推荐场景 I/O 密集型 (网络, 文件) CPU 密集型 (计算, 图像) 所有场景 (I/O 和 CPU)

6.2. 决策指南:何时使用什么?

  1. 如果你是做爬虫、请求 API:闭眼选择 ThreadPoolExecutor。默认 max_workers 通常是 CPU 核心数 * 5 或更多。
  2. 如果你是做数据分析、Pandas 处理:优先看库本身是否支持并行(很多 NumPy 底层已释放 GIL)。如果不行,使用 ProcessPoolExecutor。
  3. 关于死锁:在 ThreadPoolExecutor 中,如果在任务内再提交任务给同一个 Executor 并且等待结果,可能会导致死锁(线程耗尽)。Java 也有此风险,但在 Python 中由于线程数通常较少,更容易触发。
  4. 与 Java 交互:如果你习惯了 Java 的 CompletableFuture 链式调用,Python 的 Future 虽然有回调,但链式支持不如 Java 强大。对于复杂的异步编排(A做完 -> B做完 -> C),Python 3.4+ 推荐使用 asyncio 协程库,而不是 concurrent.futures。

总结

这里到此结束,接下来我们将继续Python核心知识:web和通信。

Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐