在编程中,我们经常需要同时处理多个任务。Python 的 threading 模块让我们能够实现多线程编程,使程序可以同时执行多个操作。这对于提高程序效率,特别是处理需要等待的任务非常有用。
想象一下,你在厨房做饭。如果只有一个灶台,你只能等一道菜做完再做下一道。但如果有多个灶台,你可以同时煮汤、炒菜和蒸饭。多线程就像多个灶台,让程序能够同时处理多个任务。
在单线程程序中,任务是一个接一个执行的。如果一个任务需要等待(比如下载文件或读取数据),整个程序就会停下来等待。多线程程序可以在等待某个任务的同时,继续执行其他任务。
我们可以通过继承 Thread 类来创建线程:
import threading
class MyThread(threading.Thread):
def run(self):
print("线程开始执行")
# 这里写线程要执行的代码
for i in range(3):
print(f"执行步骤 {i}")
threading.Event().wait(0.5) # 模拟工作
print("线程执行结束")
# 创建并启动线程
thread = MyThread()
thread.start()
thread.join()
print("主线程结束")更简单的方法是直接使用 Thread 构造函数:
import threading
def my_task():
print("线程开始执行")
for i in range(3):
print(f"处理任务 {i}")
threading.Event().wait(0.5)
print("线程执行结束")
# 创建线程
thread = threading.Thread(target=my_task)
thread.start()
thread.join()
print("主线程结束")当多个线程同时工作时,可能会遇到一些问题。
多个线程同时修改同一个数据会导致问题。锁可以解决这个问题:
import threading
lock = threading.Lock()
balance = 100
def update_balance(amount):
global balance
with lock: # 自动获取和释放锁
# 这部分代码同一时间只能有一个线程执行
current = balance
threading.Event().wait(0.01) # 模拟处理时间
balance = current + amount
# 创建多个线程同时修改余额
threads = []
for i in range(10):
t = threading.Thread(target=update_balance, args=(10,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额: {balance}") # 应该是 200如果不使用锁,最终结果可能不是 200,因为多个线程可能同时读取和修改余额。
线程之间经常需要传递数据。队列(Queue)是线程安全的,适合用于线程间通信:
import threading
import queue
import time
def producer(q, name):
for i in range(5):
item = f"{name}生产的产品-{i}"
q.put(item)
print(f"生产者 {name}: {item}")
time.sleep(0.1)
def consumer(q, name):
while True:
try:
item = q.get(timeout=1)
print(f"消费者 {name}: 处理 {item}")
time.sleep(0.2)
q.task_done()
except queue.Empty:
break
# 创建队列
q = queue.Queue()
# 创建生产者和消费者线程
producers = [
threading.Thread(target=producer, args=(q, "A")),
threading.Thread(target=producer, args=(q, "B"))
]
consumers = [
threading.Thread(target=consumer, args=(q, "X")),
threading.Thread(target=consumer, args=(q, "Y"))
]
# 启动所有线程
for p in producers:
p.start()
for c in consumers:
c.start()
# 等待生产者完成
for p in producers:
p.join()
# 等待所有任务被处理
q.join()
print("所有任务完成")事件用于线程间的简单通信:
import threading
# 创建事件
start_event = threading.Event()
finished_event = threading.Event()
def worker():
print("工作线程等待开始信号...")
start_event.wait() # 等待事件
print("工作线程开始工作")
time.sleep(1) # 模拟工作
print("工作完成")
finished_event.set() # 发送完成信号
# 启动工作线程
threading.Thread(target=worker).start()
time.sleep(0.5) # 主线程等待
print("主线程发送开始信号")
start_event.set()
print("主线程等待工作完成")
finished_event.wait()
print("所有工作完成")信号量用于控制同时访问资源的线程数量:
import threading
import time
# 只允许3个线程同时访问
semaphore = threading.Semaphore(3)
def access_resource(thread_id):
with semaphore:
print(f"线程 {thread_id} 获得访问权限")
time.sleep(1) # 模拟资源使用
print(f"线程 {thread_id} 释放访问权限")
# 创建多个线程尝试访问资源
threads = []
for i in range(8):
t = threading.Thread(target=access_resource, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()定时器用于在指定时间后执行函数:
import threading
def reminder():
print("提醒:该休息一下了!")
# 5秒后执行提醒
timer = threading.Timer(5.0, reminder)
print("定时器已设置,5秒后提醒")
timer.start()
# 如果需要可以取消定时器
# timer.cancel()下面是一个实用的多线程网页下载器示例:
import threading
import requests
import time
class Downloader:
def __init__(self, max_workers=3):
self.semaphore = threading.Semaphore(max_workers)
self.results = {}
self.lock = threading.Lock()
def download_url(self, url, name):
with self.semaphore:
try:
print(f"开始下载 {name}")
response = requests.get(url, timeout=5)
with self.lock:
self.results[name] = len(response.content)
print(f"完成下载 {name}, 大小: {len(response.content)} 字节")
except Exception as e:
print(f"下载 {name} 失败: {e}")
def start_downloads(self, url_list):
threads = []
for name, url in url_list:
t = threading.Thread(target=self.download_url, args=(url, name))
threads.append(t)
t.start()
for t in threads:
t.join()
return self.results
# 使用示例
if __name__ == "__main__":
urls_to_download = [
("首页", "https://www.fly63.com"),
("文档", "https://www.fly63.com/docs"),
("示例", "https://www.fly63.com/examples")
]
downloader = Downloader(max_workers=2)
start_time = time.time()
results = downloader.start_downloads(urls_to_download)
end_time = time.time()
print(f"下载完成,耗时: {end_time - start_time:.2f} 秒")
print(f"结果: {results}")| 类/方法/属性 | 说明 | 示例 |
|---|---|---|
| threading.Thread | 线程类,用于创建和管理线程 | t = Thread(target=func, args=(1,)) |
| threading.Lock | 互斥锁(原始锁) | lock = Lock() |
| threading.RLock | 可重入锁(同一线程可多次获取) | rlock = RLock() |
| threading.Event | 事件对象,用于线程同步 | event = Event() |
| threading.Condition | 条件变量,用于复杂线程协调 | cond = Condition() |
| threading.Semaphore | 信号量,控制并发线程数 | sem = Semaphore(3) |
| threading.BoundedSemaphore | 有界信号量(防止计数超过初始值) | b_sem = BoundedSemaphore(2) |
| threading.Timer | 定时器线程,延迟执行 | timer = Timer(5.0, func) |
| threading.local | 线程局部数据(各线程独立存储) | local_data = threading.local() |
| 方法/属性 | 说明 | 示例 |
|---|---|---|
| start() | 启动线程 | t.start() |
| run() | 线程执行的方法(可重写) | 自定义类时覆盖此方法 |
| join(timeout=None) | 阻塞当前线程,直到目标线程结束 | t.join() |
| is_alive() | 检查线程是否在运行 | if t.is_alive(): |
| name | 线程名称(可修改) | t.name = "Worker-1" |
| daemon | 守护线程标志(主线程退出时自动结束) | t.daemon = True |
| ident | 线程标识符(未启动时为 None) | print(t.ident) |
| 方法 | 说明 | 示例 |
|---|---|---|
| acquire(blocking=True, timeout=-1) | 获取锁(阻塞或非阻塞) | lock.acquire() |
| release() | 释放锁 | lock.release() |
| locked() | 检查锁是否被占用 | if not lock.locked(): |
| 方法 | 说明 | 示例 |
|---|---|---|
| set() | 设置事件为真,唤醒所有等待线程 | event.set() |
| clear() | 重置事件为假 | event.clear() |
| wait(timeout=None) | 阻塞直到事件为真或超时 | event.wait(2.0) |
| is_set() | 检查事件状态 | if event.is_set(): |
| 方法 | 说明 | 示例 |
|---|---|---|
| wait(timeout=None) | 释放锁并阻塞,直到被通知或超时 | cond.wait() |
| notify(n=1) | 唤醒最多 n 个等待线程 | cond.notify(2) |
| notify_all() | 唤醒所有等待线程 | cond.notify_all() |
| 函数/属性 | 说明 | 示例 |
|---|---|---|
| threading.active_count() | 返回当前活跃线程数 | print(threading.active_count()) |
| threading.current_thread() | 返回当前线程对象 | print(threading.current_thread().name) |
| threading.enumerate() | 返回所有活跃线程的列表 | for t in threading.enumerate(): |
| threading.main_thread() | 返回主线程对象 | if threading.current_thread() is threading.main_thread(): |
| threading.get_ident() | 返回当前线程的标识符(Python 3.3+) | print(threading.get_ident()) |
Python 有一个叫做 GIL 的机制,它限制同一时间只能有一个线程执行 Python 代码。这意味着:
对于计算密集型任务(如数学计算、图像处理),多线程可能不会提高速度
对于 I/O 密集型任务(如文件读写、网络请求),多线程仍然很有用,因为线程在等待 I/O 时可以释放 GIL
死锁:多个线程互相等待对方释放锁
资源竞争:多个线程同时修改共享数据
线程过多:创建太多线程会消耗系统资源
使用 with lock 语句自动管理锁
优先使用队列进行线程间通信
合理设置线程数量
考虑使用 ThreadPoolExecutor 管理线程池
Python 的 threading 模块提供了强大的多线程编程能力。通过合理使用线程、锁、队列等工具,我们可以编写出高效、响应快的程序。记住要根据任务类型选择是否使用多线程,并始终注意线程安全问题。
多线程编程需要练习才能掌握。从简单例子开始,逐步尝试更复杂的场景,你会慢慢熟悉这种编程方式。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!