在编程中,我们经常需要处理需要等待的操作,比如从网络下载数据、读写文件,或者与数据库通信。传统的编程方式会让程序在等待时停下来,直到操作完成。Python 的 asyncio 模块提供了一种不同的方法,让程序在等待时可以去做其他事情。
想象一下你在餐厅点餐。传统的同步方式就像你点完菜后一直站在柜台前等待,直到厨师做好菜才离开。而异步方式更像是你点完菜后拿到一个取餐号,然后可以去找座位、玩手机或者和朋友聊天,当菜准备好时再去取。
asyncio 就是 Python 中实现这种"取餐号"机制的模块。它让你能够编写在等待时不会阻塞整个程序的代码。
在传统的同步程序中,如果某个操作需要等待,整个程序就会停下来等待。比如:
# 传统的同步方式
import time
def download_data(url):
print(f"开始下载 {url}")
time.sleep(2) # 模拟网络延迟
print(f"下载完成 {url}")
return f"{url} 的数据"
# 顺序执行,总共需要6秒
start = time.time()
result1 = download_data("https://fly63.com/page1")
result2 = download_data("https://fly63.com/page2")
result3 = download_data("https://fly63.com/page3")
end = time.time()
print(f"总耗时: {end - start} 秒")使用 asyncio,我们可以在等待时执行其他任务:
# 异步方式
import asyncio
async def download_data(url):
print(f"开始下载 {url}")
await asyncio.sleep(2) # 模拟网络延迟,但不会阻塞
print(f"下载完成 {url}")
return f"{url} 的数据"
async def main():
start = time.time()
# 同时启动三个下载任务
task1 = asyncio.create_task(download_data("https://fly63.com/page1"))
task2 = asyncio.create_task(download_data("https://fly63.com/page2"))
task3 = asyncio.create_task(download_data("https://fly63.com/page3"))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
end = time.time()
print(f"总耗时: {end - start} 秒")
print(f"结果: {results}")
asyncio.run(main())异步版本大约只需要2秒就能完成所有三个下载任务,因为它们几乎是同时进行的。
协程是 asyncio 的基础。你可以把它理解为可以暂停和恢复的函数。
import asyncio
async def my_coroutine():
print("开始执行")
await asyncio.sleep(1) # 暂停1秒
print("1秒后继续执行")
await asyncio.sleep(1) # 再暂停1秒
print("又过了1秒")
return "完成"
# 运行协程
asyncio.run(my_coroutine())创建协程很简单,只需要在函数定义前加上 async 关键字。在需要暂停等待的地方使用 await 关键字。
事件循环是 asyncio 的心脏。它负责调度和执行所有的协程。当你使用 asyncio.run() 时,Python 会自动创建和管理事件循环。
import asyncio
async def task1():
for i in range(3):
print(f"任务1: 第{i+1}次执行")
await asyncio.sleep(0.5)
async def task2():
for i in range(2):
print(f"任务2: 第{i+1}次执行")
await asyncio.sleep(0.8)
async def main():
# 同时运行两个任务
await asyncio.gather(task1(), task2())
asyncio.run(main())运行这个例子,你会看到两个任务交替执行,而不是一个完成后才开始另一个。
任务是对协程的包装,让协程可以在事件循环中独立运行。
import asyncio
async def count_down(name, seconds):
for i in range(seconds, 0, -1):
print(f"{name}: {i}秒")
await asyncio.sleep(1)
print(f"{name}: 完成!")
return f"{name}的结果"
async def main():
# 创建多个任务
task1 = asyncio.create_task(count_down("任务A", 3))
task2 = asyncio.create_task(count_down("任务B", 5))
task3 = asyncio.create_task(count_down("任务C", 2))
# 等待所有任务完成并获取结果
results = await asyncio.gather(task1, task2, task3)
print(f"所有任务完成: {results}")
asyncio.run(main())下面是一个实用的异步网页下载器:
import asyncio
import aiohttp # 需要安装: pip install aiohttp
async def download_page(session, url):
try:
print(f"开始下载: {url}")
async with session.get(url, timeout=10) as response:
content = await response.text()
print(f"下载完成: {url}, 长度: {len(content)}")
return len(content)
except Exception as e:
print(f"下载失败 {url}: {e}")
return 0
async def main():
urls = [
"https://www.fly63.com",
"https://www.fly63.com/python",
"https://www.fly63.com/javascript",
"https://www.fly63.com/css",
"https://www.fly63.com/html"
]
async with aiohttp.ClientSession() as session:
tasks = [download_page(session, url) for url in urls]
results = await asyncio.gather(*tasks)
total_size = sum(results)
print(f"总共下载 {len(urls)} 个页面,总大小: {total_size} 字符")
asyncio.run(main())使用异步队列实现生产者-消费者模式:
import asyncio
import random
async def producer(queue, name, count):
for i in range(count):
item = f"{name}-产品{i}"
await queue.put(item)
print(f"生产者 {name} 生产了: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# 发送结束信号
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
# 把结束信号放回队列,让其他消费者也能看到
await queue.put(None)
print(f"消费者 {name} 结束工作")
break
print(f"消费者 {name} 处理了: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
# 创建生产者和消费者
producers = [
asyncio.create_task(producer(queue, "A", 3)),
asyncio.create_task(producer(queue, "B", 2))
]
consumers = [
asyncio.create_task(consumer(queue, "X")),
asyncio.create_task(consumer(queue, "Y"))
]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待队列中的所有项目被处理
await queue.join()
# 取消消费者任务
for consumer_task in consumers:
consumer_task.cancel()
asyncio.run(main())为异步操作设置超时时间:
import asyncio
async def slow_operation(seconds):
print(f"开始执行需要{seconds}秒的操作")
await asyncio.sleep(seconds)
return f"操作完成,耗时{seconds}秒"
async def main():
try:
# 如果操作超过3秒就超时
result = await asyncio.wait_for(slow_operation(5), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超时!")
# 另一种方式:使用shield防止任务被取消
try:
result = await asyncio.wait_for(
asyncio.shield(slow_operation(5)),
timeout=3.0
)
print(result)
except asyncio.TimeoutError:
print("超时了,但任务仍在后台运行")
# 等待原始任务完成
await asyncio.sleep(2)
print("现在原始任务应该完成了")
asyncio.run(main())当多个协程需要访问共享资源时,使用锁来避免冲突:
import asyncio
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = asyncio.Lock()
async def withdraw(self, amount, user):
async with self.lock: # 确保同一时间只有一个协程可以修改余额
print(f"{user} 尝试取款 {amount}")
if self.balance >= amount:
await asyncio.sleep(0.1) # 模拟处理时间
self.balance -= amount
print(f"{user} 取款成功,余额: {self.balance}")
return True
else:
print(f"{user} 取款失败,余额不足")
return False
async def user_operation(account, user_name):
for _ in range(3):
success = await account.withdraw(10, user_name)
await asyncio.sleep(0.2)
async def main():
account = BankAccount(50)
# 多个用户同时操作同一个账户
await asyncio.gather(
user_operation(account, "用户A"),
user_operation(account, "用户B"),
user_operation(account, "用户C")
)
print(f"最终余额: {account.balance}")
asyncio.run(main())有时候我们需要在异步代码中使用同步库:
import asyncio
import time
def sync_blocking_operation():
"""这是一个同步的阻塞操作"""
time.sleep(2) # 这会阻塞整个事件循环!
return "同步操作完成"
async def async_wrapper():
"""将同步操作包装成异步"""
# 在线程池中运行同步函数,避免阻塞事件循环
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, sync_blocking_operation)
return result
async def main():
print("开始执行")
result = await async_wrapper()
print(f"结果: {result}")
# 同时执行其他异步任务
task1 = asyncio.create_task(async_wrapper())
task2 = asyncio.create_task(asyncio.sleep(1))
results = await asyncio.gather(task1, task2)
print("所有任务完成")
asyncio.run(main())正确处理异步操作中的错误:
import asyncio
async def might_fail_operation(name, success_chance):
await asyncio.sleep(0.5)
if success_chance < 0.5:
raise ValueError(f"{name} 操作失败!")
return f"{name} 操作成功"
async def main():
tasks = [
asyncio.create_task(might_fail_operation("任务A", 0.8)),
asyncio.create_task(might_fail_operation("任务B", 0.3)),
asyncio.create_task(might_fail_operation("任务C", 0.9))
]
# 使用return_exceptions=True让gather不因单个异常而停止
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务{i}出错: {result}")
else:
print(f"任务{i}成功: {result}")
asyncio.run(main())选择合适的场景:asyncio 最适合 I/O 密集型任务,如网络请求、文件操作等。
避免阻塞操作:不要在异步函数中使用 time.sleep() 这样的阻塞调用。
合理控制并发数量:使用信号量限制同时运行的任务数量:
import asyncio
async def limited_operation(semaphore, name):
async with semaphore:
print(f"{name} 开始执行")
await asyncio.sleep(1)
print(f"{name} 完成")
return f"{name} 的结果"
async def main():
# 限制同时只能有2个操作运行
semaphore = asyncio.Semaphore(2)
tasks = []
for i in range(6):
task = asyncio.create_task(limited_operation(semaphore, f"任务{i}"))
tasks.append(task)
results = await asyncio.gather(*tasks)
print("所有任务完成")
asyncio.run(main())及时清理资源:使用 async with 语句确保资源被正确释放。
| 方法/函数 | 说明 | 示例 |
|---|---|---|
| asyncio.run(coro) | 运行异步主函数(Python 3.7+) | asyncio.run(main()) |
| asyncio.create_task(coro) | 创建任务并加入事件循环 | task = asyncio.create_task(fetch_data()) |
| asyncio.gather(*coros) | 并发运行多个协程 | await asyncio.gather(task1, task2) |
| asyncio.sleep(delay) | 异步等待(非阻塞) | await asyncio.sleep(1) |
| asyncio.wait(coros) | 控制任务完成方式 | done, pending = await asyncio.wait([task1, task2]) |
| 方法 | 说明 | 示例 |
|---|---|---|
| loop.run_until_complete(future) | 运行直到任务完成 | loop.run_until_complete(main()) |
| loop.run_forever() | 永久运行事件循环 | loop.run_forever() |
| loop.stop() | 停止事件循环 | loop.stop() |
| loop.close() | 关闭事件循环 | loop.close() |
| loop.call_soon(callback) | 安排回调函数立即执行 | loop.call_soon(print, "Hello") |
| loop.call_later(delay, callback) | 延迟执行回调 | loop.call_later(5, callback) |
| 方法/装饰器 | 说明 | 示例 |
|---|---|---|
| @asyncio.coroutine | 协程装饰器(旧版,Python 3.4-3.7) | @asyncio.coroutine def old_coro(): |
| async def | 定义协程(Python 3.5+) | async def fetch(): |
| task.cancel() | 取消任务 | task.cancel() |
| task.done() | 检查任务是否完成 | if task.done(): |
| task.result() | 获取任务结果(需任务完成) | data = task.result() |
| 类 | 说明 | 示例 |
|---|---|---|
| asyncio.Lock() | 异步互斥锁 | lock = asyncio.Lock() async with lock: |
| asyncio.Event() | 事件通知 | event = asyncio.Event() await event.wait() |
| asyncio.Queue() | 异步队列 | queue = asyncio.Queue() await queue.put(item) |
| asyncio.Semaphore() | 信号量 | sem = asyncio.Semaphore(5) async with sem: |
| 方法/类 | 说明 | 示例 |
|---|---|---|
| asyncio.open_connection() | 建立TCP连接 | reader, writer = await asyncio.open_connection('host', 80) |
| asyncio.start_server() | 创建TCP服务器 | server = await asyncio.start_server(handle, '0.0.0.0', 8888) |
| asyncio.create_subprocess_exec() | 创建子进程 | proc = await asyncio.create_subprocess_exec('ls') |
| 方法 | 说明 | 示例 |
|---|---|---|
| asyncio.current_task() | 获取当前任务 | task = asyncio.current_task() |
| asyncio.all_tasks() | 获取所有任务 | tasks = asyncio.all_tasks() |
| asyncio.shield(coro) | 保护任务不被取消 | await asyncio.shield(critical_task) |
| asyncio.wait_for(coro, timeout) | 带超时的等待 | try: await asyncio.wait_for(task, 5) |
asyncio 是 Python 中强大的异步编程工具,特别适合处理大量的 I/O 操作。通过使用 async/await 语法,我们可以编写出既高效又易于理解的并发代码。
记住这些要点:
使用 async def 定义协程
使用 await 来等待异步操作
使用 asyncio.run() 运行主协程
使用 asyncio.create_task() 创建并发任务
使用 asyncio.gather() 等待多个任务完成
fly63 网站提供了更多详细的异步编程教程和实际案例,帮助你更好地掌握这个强大的工具。开始尝试在自己的项目中使用 asyncio,你会发现它能让你的程序运行得更快、更高效。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!