本文部分内容摘自 Python 官方文档 和 《Python并行编程手册》。
threading 是在底层模块 _thread 的上层封装,具体用法参见:threading — Thread-based parallelism
多线程适合处理 IO密集型 任务,比如网络请求,读写磁盘文件等,理由如下:
CPython由于GIL的存在,在任一时刻只能有一个线程执行Python字节码,所以多线程实际上无法并行执行,即无法利用CPU多核,导致无法提高计算密集型的运行效率。
但是在执行IO操作时,线程会主动释放GIL锁,让出执行权,所以对于IO密集型的程序,多线程能够提高运行效率。
Python 官方文档例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import threading
import time
def crawl(link, delay=3):
print(f"crawl started for {link}")
time.sleep(delay) # Blocking I/O (simulating a network request)
print(f"crawl ended for {link}")
links = [
"https://python.org",
"https://docs.python.org",
"https://peps.python.org",
]
# Start threads for each link
threads = []
for link in links:
# Using `args` to pass positional arguments and `kwargs` for keyword arguments
t = threading.Thread(target=crawl, args=(link,), kwargs={"delay": 2})
threads.append(t)
# Start each thread
for t in threads:
t.start()
# Wait for all threads to finish
for t in threads:
t.join()
|
该模块的主要用法是:
- 使用
threading.Thread 实例化一个线程对象;
- 使用
.start 方法启动这个线程;
- 使用
.join 声明主线程在这个线程结束后再退出。
threading.Thread 的函数签名如下:
1
|
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
|
| 参数 |
说明 |
| group |
应该为 None,给未来 ThreadGroup 使用的保留参数 |
| target |
被 run() 方法调用的可执行对象,即启动一个线程时执行的函数,默认为 None |
| name |
线程名字,如果不指定则默认格式是:Thread-N(N是小十进制数字),如果不指定 name 但是指定了target参数,则格式为:Thread-N(target)(括号中的target为函数名) |
| args |
列表或元组,值是调用 target 所需要的参数,默认是 () |
| kwargs |
字典,值是调用 target 所需要的参数,默认是 {} |
| daemon |
是否守护进程,如不指定则从当前线程继承。(Python 3.3 加入此参数) |
如果自定义线程子类覆写了这个构造器,则必须首先调用基类的构造方法 Thread.__init__() 再做其他操作。
name 即线程名字在 threading.py 源码(Python 3.12)中的实现摘录如下:
(Python 3.10 之前,即便指定了 target 参数,name 的默认名字仍然是:Thread-N)
- 生成了一个从 1 开始,步长为 1 的无限迭代器
_counter。
- 如果指定了
name 则直接使用 name。
- 如果没有指定
name,则使用 Thread-%d(其中 %d 为迭代器的下一个值)。
- 如果指定了
target,则在后面拼接上 target 的 __name__(即函数名)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# threading.py
from itertools import count as _count
# Helper to generate new thread names
_counter = _count(1).__next__
def _newname(name_template):
return name_template % _counter()
class Thread:
_initialized = False
def __init__(
self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None
):
assert group is None, "group argument must be None for now"
if kwargs is None:
kwargs = {}
if name:
name = str(name)
else:
name = _newname("Thread-%d")
if target is not None:
try:
target_name = target.__name__
name += f" ({target_name})"
except AttributeError:
pass
|
| 方法 |
说明 |
| start() |
启动一个线程,一个线程只能调用一次 |
| run() |
线程实际执行的函数,可在自定义线程子类中覆写 |
| join(timeout=None) |
等待此线程结束再继续执行 |
| name |
用于标识线程,没有语义,可重复。默认值由构造器指定 |
| ident |
Python分配的线程ID,进程周期内可能被回收重用,用于Python内部标识 |
| native_id |
操作系统分配的线程ID,进程周期内唯一,用于操作系统级交互 |
| is_alive() |
run()方法启动后,结束前,会返回 True |
| daemon |
布尔值,指示线程是否守护进程。整个Python程序会在所有非守护进程结束后再退出 |
在 CPython 中,由于 GIL(Global Interpreter Lock) 的存在,限制了Python进程在任一时刻,只能有一个线程执行一个 Python 字节码(不过在IO密集型场景下,这种多线程模式已经够用了)。
如果你想更好的利用多核机器的资源,推荐使用(多进程) multiprocessing 或 concurrent.futures.ProcessPoolExecutor 库。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import threading
import time
def worker(num):
"""线程执行的任务函数"""
print(f"Worker {num} started")
time.sleep(2) # 模拟耗时操作
print(f"Worker {num} finished")
# 创建线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start() # 启动线程
# 等待所有线程完成
for t in threads:
t.join()
print("All threads completed")
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self):
"""重写run方法"""
print(f"Custom thread {self.num} started")
time.sleep(1) # 模拟耗时操作
print(f"Custom thread {self.num} finished")
# 创建并启动线程
threads = []
for i in range(3):
t = MyThread(i)
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("All custom threads completed")
|
线程池可以控制同时运行的线程数量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
print(f"Task {name} started")
time.sleep(2)
print(f"Task {name} finished")
return f"Result of {name}"
# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务到线程池
futures = [executor.submit(task, i) for i in range(5)]
# 获取结果
for future in futures:
print(future.result())
print("All tasks completed")
|
作用: 最基本的锁,保证同一时刻只有一个线程能访问共享资源。
特点: 不可重入(同一线程重复获取会死锁)。 只能被获取它的线程释放。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import threading
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock: # 自动获取和释放锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 输出 10
|
作用: 允许同一线程多次获取锁,避免死锁。
特点: 必须由获取锁的线程释放相同次数的锁。
示例:
1
2
3
4
5
6
7
8
9
10
11
|
import threading
rlock = threading.RLock()
def recursive_func(n):
if n > 0:
with rlock: # 可重入
recursive_func(n - 1)
recursive_func(5) # 不会死锁
|
作用: 控制同时访问资源的线程数量。
特点: 维护一个计数器,acquire()减少,release()增加。 可用于限流(如数据库连接池)。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
|
import time
import threading
semaphore = threading.Semaphore(3) # 最多3个线程同时运行
def task():
with semaphore:
print(threading.current_thread().name, "working")
time.sleep(1)
for i in range(10):
threading.Thread(target=task).start()
|
作用: 让线程等待特定条件满足后再执行。
特点: 通常与Lock或RLock一起使用。 通过wait()、notify()/notify_all()实现线程间通信。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import threading
condition = threading.Condition()
items = []
def consumer():
with condition:
while not items:
condition.wait() # 等待生产者通知
print("Consumed:", items.pop())
def producer():
with condition:
items.append(1)
condition.notify() # 唤醒一个消费者
threading.Thread(target=consumer).start()
threading.Thread(target=producer).start()
|
作用: 线程间简单的通知机制,一个线程发出事件,其他线程等待。
特点: 通过set()、clear()和wait()控制。 一次性广播(set()后所有wait()的线程被唤醒)。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time
import threading
event = threading.Event()
def waiter1():
print("waiter1 Waiting for event...")
event.wait()
print("waiter1: Event triggered!")
def waiter2():
print("waiter2 Waiting for event...")
event.wait()
print("waiter2: Event triggered!")
threading.Thread(target=waiter1).start()
threading.Thread(target=waiter2).start()
time.sleep(2)
event.set() # 唤醒所有等待线程
|
作用: 让一组线程等待,直到所有线程到达某个点后再继续。
特点: 类似“集合点”机制。 适用于分阶段任务。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import time
import threading
barrier = threading.Barrier(3) # 需要3个线程到达
def worker():
print("Reached barrier")
barrier.wait()
print("Passed barrier")
for _ in range(3):
threading.Thread(target=worker).start()
time.sleep(1)
|
作用: 线程安全的FIFO数据结构,用于生产者-消费者模型。
特点: 自动处理锁的获取和释放。 支持put()、get()等操作。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import time
import threading
from queue import Queue
q = Queue()
def producer():
q.put("data")
def consumer():
print("Got:", q.get())
# 先启动两个消费线程
threading.Thread(target=consumer).start()
threading.Thread(target=consumer).start()
# 3秒后启动一个生产线程
time.sleep(3)
threading.Thread(target=producer).start() # 数据生产后,其中一个消费线程会拿到数据并退出,另一个线程继续等待
# 2秒后再启动一个生产线程
time.sleep(2)
threading.Thread(target=producer).start() # 数据再生产后,另一个消费线程拿到数据退出,主线程结束
|
| 机制 |
用途 |
重入性 |
线程唤醒方式 |
适用场景 |
| Lock |
互斥访问资源 |
不可重入 |
- |
简单同步 |
| RLock |
同一线程多次获取锁 |
可重入 |
- |
嵌套锁场景 |
| Semaphore |
限制并发线程数 |
计数控制 |
- |
资源池、限流 |
| Condition |
条件满足后执行 |
需搭配锁 |
notify()/notify_all() |
复杂条件等待 |
| Event |
简单线程通知 |
- |
set()广播 |
一次性通知 |
| Barrier |
多线程同步到同一阶段 |
- |
全部到达后自动唤醒 |
分阶段任务 |
| Queue |
线程安全的数据传递 |
- |
阻塞get()/put() |
生产者-消费者模型 |
选择建议
- 需要互斥访问共享资源 → Lock或RLock。
- 限制并发数量 → Semaphore。
- 线程间通信(如生产者-消费者)→ Condition或Queue。
- 简单通知 → Event。
- 多线程同步到同一阶段 → Barrier。
multiprocessing使用子进程而不是线程来绕过GIL锁,所以多进程可以实现真正的并行, 有效利用物理机器的多核性能,因此多进程适合处理计算密集型的程序。
它的API和threading包的基本保持一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import multiprocessing
def worker():
print("Process working")
processes = []
for _ in range(5):
p = multiprocessing.Process(target=worker)
processes.append(p)
p.start()
for p in processes:
p.join()
|
该模块的主要用法是:
- 使用
multiprocessing.Process 实例化一个进程对象;
- 使用
.start 方法启动这个进程;
- 使用
.join 声明主进程在这个进程结束后再退出。
multiprocessing.Process 的函数签名如下:
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
| 参数 |
说明 |
| group |
应该始终为 None,这个参数仅仅是为了兼容 threading.Thread 的API |
| target |
被 run() 方法调用的可执行对象,即启动一个进程时执行的函数,默认为 None |
| name |
进程名字,如果不指定则默认格式是:Process-N(N是小十进制数字) |
| args |
列表或元组,值是调用 target 所需要的参数,默认是 () |
| kwargs |
字典,值是调用 target 所需要的参数,默认是 {} |
| daemon |
是否守护进程,如不指定则从当前进程继承。(Python 3.3 加入此参数) |
| 方法 |
说明 |
| run() |
进程实际执行的函数,可在自定义进程子类中覆写 |
| start() |
启动一个进程,一个进程只能调用一次 |
| join(timeout=None) |
等待此进程结束再继续执行 |
| name |
用于标识进程,没有语义,可重复。默认值由构造器指定 |
| is_alive() |
run()方法启动后,结束前,会返回 True |
| daemon |
布尔值,指示进程是否守护进程。整个Python程序会在所有非守护进程结束后再退出。 |
| pid |
操作系统分配的进程ID |
| exitcode |
子进程推出状态码。正常0、异常1、sys.exit(N)则N、signal N则 -N |
| terminate() |
终止进程。unix用 SIGTERM 信号,win用 TerminateProcess() |
| kill() |
和 terminate() 一样,但是使用 SIGKILL 信号 |
| close() |
关闭进程对象,释放所有资源 |
注意:start()、join()、is_alive()、terminate() 和 exitcode 应该只由进程的创造者调用。
asyncio 是一个通过 async/await 语法,实现并发编程的库。 asyncio 是很多异步框架的底层库,提供了高性能的网络服务、数据库连接服务和任务队列等。
1
2
3
4
5
6
7
8
|
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
asyncio.run(main())
|
运行协程并返回结果:
1
|
asyncio.run(coro, *, debug=None, loop_factory=None)
|
1
2
3
4
5
6
|
async def main():
await asyncio.sleep(1)
print('hello')
with asyncio.Runner() as runner:
runner.run(main())
|
可复用的带上下文管理的Runner对象:
1
|
class asyncio.Runner(*, debug=None, loop_factory=None)
|
对比:
| 特性 |
asyncio.run() |
asyncio.Runner().run() |
| Python 版本 |
3.7+ |
3.11+ |
| 事件循环管理 |
每次调用创建新循环 |
可复用同一个循环 |
| 嵌套运行 |
❌ 不允许 |
✅ 允许 |
| 资源管理 |
自动清理 |
需用 with 语句 |
| 适用场景 |
简单脚本 |
复杂应用、多次运行协程 |
| 性能 |
每次创建/销毁循环 |
可复用循环,更高效 |