Python 并行编程(多线程、多进程)

本文部分内容摘自 Python 官方文档《Python并行编程手册》

一、基于线程的并行

1.1 threading 模块的介绍

threading 是在底层模块 _thread 的上层封装,具体用法参见:threading — Thread-based parallelism

多线程适合处理 IO密集型 任务,比如网络请求,读写磁盘文件等,理由如下:

CPython由于GIL的存在,在任一时刻只能有一个线程执行Python字节码,所以多线程实际上无法并行执行,即无法利用CPU多核,导致无法提高计算密集型的运行效率。 但是在执行IO操作时,线程会主动释放GIL锁,让出执行权,所以对于IO密集型的程序,多线程能够提高运行效率。

使用示例

Python 官方文档例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading
import time

def crawl(link, delay=3):
    print(f"crawl started for {link}")
    time.sleep(delay)  # Blocking I/O (simulating a network request)
    print(f"crawl ended for {link}")

links = [
    "https://python.org",
    "https://docs.python.org",
    "https://peps.python.org",
]

# Start threads for each link
threads = []
for link in links:
    # Using `args` to pass positional arguments and `kwargs` for keyword arguments
    t = threading.Thread(target=crawl, args=(link,), kwargs={"delay": 2})
    threads.append(t)

# Start each thread
for t in threads:
    t.start()

# Wait for all threads to finish
for t in threads:
    t.join()

该模块的主要用法是:

  1. 使用 threading.Thread 实例化一个线程对象;
  2. 使用 .start 方法启动这个线程;
  3. 使用 .join 声明主线程在这个线程结束后再退出。

签名参数解释

threading.Thread 的函数签名如下:

1
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
参数 说明
group 应该为 None,给未来 ThreadGroup 使用的保留参数
target run() 方法调用的可执行对象,即启动一个线程时执行的函数,默认为 None
name 线程名字,如果不指定则默认格式是:Thread-N(N是小十进制数字),如果不指定 name 但是指定了target参数,则格式为:Thread-N(target)(括号中的target为函数名)
args 列表或元组,值是调用 target 所需要的参数,默认是 ()
kwargs 字典,值是调用 target 所需要的参数,默认是 {}
daemon 是否守护进程,如不指定则从当前线程继承。(Python 3.3 加入此参数)

如果自定义线程子类覆写了这个构造器,则必须首先调用基类的构造方法 Thread.__init__() 再做其他操作。

name 即线程名字在 threading.py 源码(Python 3.12)中的实现摘录如下:

(Python 3.10 之前,即便指定了 target 参数,name 的默认名字仍然是:Thread-N

  1. 生成了一个从 1 开始,步长为 1 的无限迭代器 _counter
  2. 如果指定了 name 则直接使用 name
  3. 如果没有指定 name,则使用 Thread-%d(其中 %d 为迭代器的下一个值)。
  4. 如果指定了 target,则在后面拼接上 target__name__(即函数名)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# threading.py
from itertools import count as _count

# Helper to generate new thread names
_counter = _count(1).__next__

def _newname(name_template):
    return name_template % _counter()

class Thread:
    _initialized = False

    def __init__(
        self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None
    ):
        assert group is None, "group argument must be None for now"
        if kwargs is None:
            kwargs = {}
        if name:
            name = str(name)
        else:
            name = _newname("Thread-%d")
            if target is not None:
                try:
                    target_name = target.__name__
                    name += f" ({target_name})"
                except AttributeError:
                    pass

线程对象方法说明

方法 说明
start() 启动一个线程,一个线程只能调用一次
run() 线程实际执行的函数,可在自定义线程子类中覆写
join(timeout=None) 等待此线程结束再继续执行
name 用于标识线程,没有语义,可重复。默认值由构造器指定
ident Python分配的线程ID,进程周期内可能被回收重用,用于Python内部标识
native_id 操作系统分配的线程ID,进程周期内唯一,用于操作系统级交互
is_alive() run()方法启动后,结束前,会返回 True
daemon 布尔值,指示线程是否守护进程。整个Python程序会在所有非守护进程结束后再退出

注意事项

CPython 中,由于 GIL(Global Interpreter Lock) 的存在,限制了Python进程在任一时刻,只能有一个线程执行一个 Python 字节码(不过在IO密集型场景下,这种多线程模式已经够用了)。

如果你想更好的利用多核机器的资源,推荐使用(多进程) multiprocessingconcurrent.futures.ProcessPoolExecutor 库。

1.2 threading 模块的3种使用方法

方法一:创建Thread实例,直接传入目标函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import time

def worker(num):
    """线程执行的任务函数"""
    print(f"Worker {num} started")
    time.sleep(2)  # 模拟耗时操作
    print(f"Worker {num} finished")

# 创建线程
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()  # 启动线程

# 等待所有线程完成
for t in threads:
    t.join()

print("All threads completed")

方法二:继承Thread类并重写run方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self.num = num
    
    def run(self):
        """重写run方法"""
        print(f"Custom thread {self.num} started")
        time.sleep(1)  # 模拟耗时操作
        print(f"Custom thread {self.num} finished")

# 创建并启动线程
threads = []
for i in range(3):
    t = MyThread(i)
    threads.append(t)
    t.start()

# 等待所有线程完成
for t in threads:
    t.join()

print("All custom threads completed")

方法三:使用线程池ThreadPoolExecutor (Python 3.2+)

线程池可以控制同时运行的线程数量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from concurrent.futures import ThreadPoolExecutor
import time

def task(name):
    print(f"Task {name} started")
    time.sleep(2)
    print(f"Task {name} finished")
    return f"Result of {name}"

# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务到线程池
    futures = [executor.submit(task, i) for i in range(5)]
    
    # 获取结果
    for future in futures:
        print(future.result())

print("All tasks completed")

1.3 同步机制

Lock(互斥锁)

作用: 最基本的锁,保证同一时刻只有一个线程能访问共享资源。

特点: 不可重入(同一线程重复获取会死锁)。 只能被获取它的线程释放。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import threading

lock = threading.Lock()
counter = 0

def increment():
    global counter
    with lock:  # 自动获取和释放锁
        counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(counter)  # 输出 10

RLock(可重入锁)

作用: 允许同一线程多次获取锁,避免死锁。

特点: 必须由获取锁的线程释放相同次数的锁。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import threading


rlock = threading.RLock()

def recursive_func(n):
    if n > 0:
        with rlock:  # 可重入
            recursive_func(n - 1)

recursive_func(5)  # 不会死锁

Semaphore(信号量)

作用: 控制同时访问资源的线程数量。

特点: 维护一个计数器,acquire()减少,release()增加。 可用于限流(如数据库连接池)。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import time
import threading

semaphore = threading.Semaphore(3)  # 最多3个线程同时运行

def task():
    with semaphore:
        print(threading.current_thread().name, "working")
        time.sleep(1)

for i in range(10):
    threading.Thread(target=task).start()

Condition(条件变量)

作用: 让线程等待特定条件满足后再执行。

特点: 通常与Lock或RLock一起使用。 通过wait()、notify()/notify_all()实现线程间通信。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import threading


condition = threading.Condition()
items = []

def consumer():
    with condition:
        while not items:
            condition.wait()  # 等待生产者通知
        print("Consumed:", items.pop())

def producer():
    with condition:
        items.append(1)
        condition.notify()  # 唤醒一个消费者

threading.Thread(target=consumer).start()
threading.Thread(target=producer).start()

Event(事件)

作用: 线程间简单的通知机制,一个线程发出事件,其他线程等待。

特点: 通过set()、clear()和wait()控制。 一次性广播(set()后所有wait()的线程被唤醒)。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading


event = threading.Event()

def waiter1():
    print("waiter1 Waiting for event...")
    event.wait()
    print("waiter1: Event triggered!")

def waiter2():
    print("waiter2 Waiting for event...")
    event.wait()
    print("waiter2: Event triggered!")

threading.Thread(target=waiter1).start()
threading.Thread(target=waiter2).start()

time.sleep(2)
event.set()  # 唤醒所有等待线程

Barrier(屏障)

作用: 让一组线程等待,直到所有线程到达某个点后再继续。

特点: 类似“集合点”机制。 适用于分阶段任务。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import time
import threading


barrier = threading.Barrier(3)  # 需要3个线程到达

def worker():
    print("Reached barrier")
    barrier.wait()
    print("Passed barrier")

for _ in range(3):
    threading.Thread(target=worker).start()
    time.sleep(1)

Queue(队列)

作用: 线程安全的FIFO数据结构,用于生产者-消费者模型。

特点: 自动处理锁的获取和释放。 支持put()、get()等操作。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import threading


from queue import Queue

q = Queue()

def producer():
    q.put("data")

def consumer():
    print("Got:", q.get())

# 先启动两个消费线程
threading.Thread(target=consumer).start()
threading.Thread(target=consumer).start()
# 3秒后启动一个生产线程
time.sleep(3)
threading.Thread(target=producer).start()  # 数据生产后,其中一个消费线程会拿到数据并退出,另一个线程继续等待
# 2秒后再启动一个生产线程
time.sleep(2)
threading.Thread(target=producer).start()  # 数据再生产后,另一个消费线程拿到数据退出,主线程结束

关键区别总结

机制 用途 重入性 线程唤醒方式 适用场景
Lock 互斥访问资源 不可重入 - 简单同步
RLock 同一线程多次获取锁 可重入 - 嵌套锁场景
Semaphore 限制并发线程数 计数控制 - 资源池、限流
Condition 条件满足后执行 需搭配锁 notify()/notify_all() 复杂条件等待
Event 简单线程通知 - set()广播 一次性通知
Barrier 多线程同步到同一阶段 - 全部到达后自动唤醒 分阶段任务
Queue 线程安全的数据传递 - 阻塞get()/put() 生产者-消费者模型

选择建议

  • 需要互斥访问共享资源 → Lock或RLock。
  • 限制并发数量 → Semaphore。
  • 线程间通信(如生产者-消费者)→ Condition或Queue。
  • 简单通知 → Event。
  • 多线程同步到同一阶段 → Barrier。

二、基于进程的并行

2.1 multiprocessing 模块的介绍

multiprocessing使用子进程而不是线程来绕过GIL锁,所以多进程可以实现真正的并行, 有效利用物理机器的多核性能,因此多进程适合处理计算密集型的程序。 它的API和threading包的基本保持一致。

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import multiprocessing

def worker():
    print("Process working")

processes = []
for _ in range(5):
    p = multiprocessing.Process(target=worker)
    processes.append(p)
    p.start()

for p in processes:
    p.join()

该模块的主要用法是:

  1. 使用 multiprocessing.Process 实例化一个进程对象;
  2. 使用 .start 方法启动这个进程;
  3. 使用 .join 声明主进程在这个进程结束后再退出。

参数签名解释

multiprocessing.Process 的函数签名如下:

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
参数 说明
group 应该始终为 None,这个参数仅仅是为了兼容 threading.Thread 的API
target run() 方法调用的可执行对象,即启动一个进程时执行的函数,默认为 None
name 进程名字,如果不指定则默认格式是:Process-N(N是小十进制数字)
args 列表或元组,值是调用 target 所需要的参数,默认是 ()
kwargs 字典,值是调用 target 所需要的参数,默认是 {}
daemon 是否守护进程,如不指定则从当前进程继承。(Python 3.3 加入此参数)

进程对象方法说明

方法 说明
run() 进程实际执行的函数,可在自定义进程子类中覆写
start() 启动一个进程,一个进程只能调用一次
join(timeout=None) 等待此进程结束再继续执行
name 用于标识进程,没有语义,可重复。默认值由构造器指定
is_alive() run()方法启动后,结束前,会返回 True
daemon 布尔值,指示进程是否守护进程。整个Python程序会在所有非守护进程结束后再退出。
pid 操作系统分配的进程ID
exitcode 子进程推出状态码。正常0、异常1、sys.exit(N)则N、signal N则 -N
terminate() 终止进程。unix用 SIGTERM 信号,win用 TerminateProcess()
kill() terminate() 一样,但是使用 SIGKILL 信号
close() 关闭进程对象,释放所有资源

注意:start()join()is_alive()terminate()exitcode 应该只由进程的创造者调用。

三、Python的协程

3.1 asyncio 模块的介绍

asyncio 是一个通过 async/await 语法,实现并发编程的库。 asyncio 是很多异步框架的底层库,提供了高性能的网络服务、数据库连接服务和任务队列等。

使用示例

1
2
3
4
5
6
7
8
import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

asyncio.run(main())

运行协程并返回结果:

1
asyncio.run(coro, *, debug=None, loop_factory=None)
1
2
3
4
5
6
async def main():
    await asyncio.sleep(1)
    print('hello')

with asyncio.Runner() as runner:
    runner.run(main())

可复用的带上下文管理的Runner对象:

1
class asyncio.Runner(*, debug=None, loop_factory=None)

对比:

特性 asyncio.run() asyncio.Runner().run()
Python 版本 3.7+ 3.11+
事件循环管理 每次调用创建新循环 可复用同一个循环
嵌套运行 ❌ 不允许 ✅ 允许
资源管理 自动清理 需用 with 语句
适用场景 简单脚本 复杂应用、多次运行协程
性能 每次创建/销毁循环 可复用循环,更高效
updatedupdated2026-02-052026-02-05