Python3入门指南Python语言的特点和实际应用Python3环境搭建配置VSCode进行Python开发Python基础语法Python变量与数据类型Python数据类型转换Python解释器使用Python注释使用Python运算符Python数字类型Python字符串操作Python列表操作Python元组使用Python字典使用Python集合使用Python条件控制详解Python循环语句详解Python编程入门实践Python推导式详解Python迭代器和生成器Python with语句详解Python函数详解Python lambda(匿名函数)Python装饰器Python数据结构Python模块和包使用Python中__name__和__main__的用法Python输入输出:从基础到文件操作Python文件操作Python OS模块使用Python错误和异常处理Python面向对象编程Python命名空间和作用域Python虚拟环境:venv详细教程Python类型注解Python标准库常用模块Python正则表达式Python CGI编程Python MySQL(mysql-connector驱动)Python MySQL(PyMySQL驱动)Python网络编程Python发送邮件Python多线程编程Python XML解析Python JSON解析Python日期和时间处理Python操作MongoDBPython urllib库使用Python uWSGI 安装与配置Python pip包管理工具Python operator模块Python math模块Python requests模块HTTP请求Python random模块Python OpenAI库Python AI绘画制作Python statistics模块Python hashlib模块:哈希加密Python量化交易Python pyecharts数据可视化Python Selenium网页自动化Python BeautifulSoup网页数据提取Python Scrapy爬虫框架Python Markdown转HTMLPython sys模块Python Pickle模块:数据存储Python subprocess模块Python queue队列模块Python StringIO内存文件操作Python logging日志记录Python datetime日期时间处理Python re正则表达式Python csv表格数据处理Python threading 多线程编程Python asyncio 异步编程Python PyQt 图形界面开发Python 应用方向和常用库框架

Python asyncio 异步编程

在编程中,我们经常需要处理需要等待的操作,比如从网络下载数据、读写文件,或者与数据库通信。传统的编程方式会让程序在等待时停下来,直到操作完成。Python 的 asyncio 模块提供了一种不同的方法,让程序在等待时可以去做其他事情。


什么是异步编程?

想象一下你在餐厅点餐。传统的同步方式就像你点完菜后一直站在柜台前等待,直到厨师做好菜才离开。而异步方式更像是你点完菜后拿到一个取餐号,然后可以去找座位、玩手机或者和朋友聊天,当菜准备好时再去取。

asyncio 就是 Python 中实现这种"取餐号"机制的模块。它让你能够编写在等待时不会阻塞整个程序的代码


为什么需要 asyncio?

在传统的同步程序中,如果某个操作需要等待,整个程序就会停下来等待。比如:

# 传统的同步方式
import time

def download_data(url):
    print(f"开始下载 {url}")
    time.sleep(2)  # 模拟网络延迟
    print(f"下载完成 {url}")
    return f"{url} 的数据"

# 顺序执行,总共需要6秒
start = time.time()
result1 = download_data("https://fly63.com/page1")
result2 = download_data("https://fly63.com/page2") 
result3 = download_data("https://fly63.com/page3")
end = time.time()
print(f"总耗时: {end - start} 秒")

使用 asyncio,我们可以在等待时执行其他任务:

# 异步方式
import asyncio

async def download_data(url):
    print(f"开始下载 {url}")
    await asyncio.sleep(2)  # 模拟网络延迟,但不会阻塞
    print(f"下载完成 {url}")
    return f"{url} 的数据"

async def main():
    start = time.time()
    # 同时启动三个下载任务
    task1 = asyncio.create_task(download_data("https://fly63.com/page1"))
    task2 = asyncio.create_task(download_data("https://fly63.com/page2"))
    task3 = asyncio.create_task(download_data("https://fly63.com/page3"))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2, task3)
    end = time.time()
    print(f"总耗时: {end - start} 秒")
    print(f"结果: {results}")

asyncio.run(main())

异步版本大约只需要2秒就能完成所有三个下载任务,因为它们几乎是同时进行的。


asyncio 核心概念

协程(Coroutine)

协程是 asyncio 的基础。你可以把它理解为可以暂停和恢复的函数

import asyncio

async def my_coroutine():
    print("开始执行")
    await asyncio.sleep(1)  # 暂停1秒
    print("1秒后继续执行")
    await asyncio.sleep(1)  # 再暂停1秒
    print("又过了1秒")
    return "完成"

# 运行协程
asyncio.run(my_coroutine())

创建协程很简单,只需要在函数定义前加上 async 关键字。在需要暂停等待的地方使用 await 关键字。

事件循环(Event Loop)

事件循环是 asyncio 的心脏。它负责调度和执行所有的协程。当你使用 asyncio.run() 时,Python 会自动创建和管理事件循环。

import asyncio

async def task1():
    for i in range(3):
        print(f"任务1: 第{i+1}次执行")
        await asyncio.sleep(0.5)

async def task2():
    for i in range(2):
        print(f"任务2: 第{i+1}次执行") 
        await asyncio.sleep(0.8)

async def main():
    # 同时运行两个任务
    await asyncio.gather(task1(), task2())

asyncio.run(main())

运行这个例子,你会看到两个任务交替执行,而不是一个完成后才开始另一个。

任务(Task)

任务是对协程的包装,让协程可以在事件循环中独立运行。

import asyncio

async def count_down(name, seconds):
    for i in range(seconds, 0, -1):
        print(f"{name}: {i}秒")
        await asyncio.sleep(1)
    print(f"{name}: 完成!")
    return f"{name}的结果"

async def main():
    # 创建多个任务
    task1 = asyncio.create_task(count_down("任务A", 3))
    task2 = asyncio.create_task(count_down("任务B", 5))
    task3 = asyncio.create_task(count_down("任务C", 2))
    
    # 等待所有任务完成并获取结果
    results = await asyncio.gather(task1, task2, task3)
    print(f"所有任务完成: {results}")

asyncio.run(main())


实际应用示例

异步网页下载器

下面是一个实用的异步网页下载器:

import asyncio
import aiohttp  # 需要安装: pip install aiohttp

async def download_page(session, url):
    try:
        print(f"开始下载: {url}")
        async with session.get(url, timeout=10) as response:
            content = await response.text()
            print(f"下载完成: {url}, 长度: {len(content)}")
            return len(content)
    except Exception as e:
        print(f"下载失败 {url}: {e}")
        return 0

async def main():
    urls = [
        "https://www.fly63.com",
        "https://www.fly63.com/python",
        "https://www.fly63.com/javascript", 
        "https://www.fly63.com/css",
        "https://www.fly63.com/html"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [download_page(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        total_size = sum(results)
        print(f"总共下载 {len(urls)} 个页面,总大小: {total_size} 字符")

asyncio.run(main())

生产者-消费者模式

使用异步队列实现生产者-消费者模式:

import asyncio
import random

async def producer(queue, name, count):
    for i in range(count):
        item = f"{name}-产品{i}"
        await queue.put(item)
        print(f"生产者 {name} 生产了: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    
    # 发送结束信号
    await queue.put(None)

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            # 把结束信号放回队列,让其他消费者也能看到
            await queue.put(None)
            print(f"消费者 {name} 结束工作")
            break
            
        print(f"消费者 {name} 处理了: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)
    
    # 创建生产者和消费者
    producers = [
        asyncio.create_task(producer(queue, "A", 3)),
        asyncio.create_task(producer(queue, "B", 2))
    ]
    
    consumers = [
        asyncio.create_task(consumer(queue, "X")),
        asyncio.create_task(consumer(queue, "Y"))
    ]
    
    # 等待所有生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列中的所有项目被处理
    await queue.join()
    
    # 取消消费者任务
    for consumer_task in consumers:
        consumer_task.cancel()

asyncio.run(main())


高级特性

超时控制

为异步操作设置超时时间:

import asyncio

async def slow_operation(seconds):
    print(f"开始执行需要{seconds}秒的操作")
    await asyncio.sleep(seconds)
    return f"操作完成,耗时{seconds}秒"

async def main():
    try:
        # 如果操作超过3秒就超时
        result = await asyncio.wait_for(slow_operation(5), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时!")
    
    # 另一种方式:使用shield防止任务被取消
    try:
        result = await asyncio.wait_for(
            asyncio.shield(slow_operation(5)), 
            timeout=3.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("超时了,但任务仍在后台运行")
        # 等待原始任务完成
        await asyncio.sleep(2)
        print("现在原始任务应该完成了")

asyncio.run(main())

异步锁

当多个协程需要访问共享资源时,使用锁来避免冲突:

import asyncio

class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = asyncio.Lock()
    
    async def withdraw(self, amount, user):
        async with self.lock:  # 确保同一时间只有一个协程可以修改余额
            print(f"{user} 尝试取款 {amount}")
            if self.balance >= amount:
                await asyncio.sleep(0.1)  # 模拟处理时间
                self.balance -= amount
                print(f"{user} 取款成功,余额: {self.balance}")
                return True
            else:
                print(f"{user} 取款失败,余额不足")
                return False

async def user_operation(account, user_name):
    for _ in range(3):
        success = await account.withdraw(10, user_name)
        await asyncio.sleep(0.2)

async def main():
    account = BankAccount(50)
    
    # 多个用户同时操作同一个账户
    await asyncio.gather(
        user_operation(account, "用户A"),
        user_operation(account, "用户B"),
        user_operation(account, "用户C")
    )
    
    print(f"最终余额: {account.balance}")

asyncio.run(main())


常见问题与解决方案

1. 在异步函数中调用同步函数

有时候我们需要在异步代码中使用同步库:

import asyncio
import time

def sync_blocking_operation():
    """这是一个同步的阻塞操作"""
    time.sleep(2)  # 这会阻塞整个事件循环!
    return "同步操作完成"

async def async_wrapper():
    """将同步操作包装成异步"""
    # 在线程池中运行同步函数,避免阻塞事件循环
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sync_blocking_operation)
    return result

async def main():
    print("开始执行")
    result = await async_wrapper()
    print(f"结果: {result}")
    
    # 同时执行其他异步任务
    task1 = asyncio.create_task(async_wrapper())
    task2 = asyncio.create_task(asyncio.sleep(1))
    
    results = await asyncio.gather(task1, task2)
    print("所有任务完成")

asyncio.run(main())

2. 错误处理

正确处理异步操作中的错误:

import asyncio

async def might_fail_operation(name, success_chance):
    await asyncio.sleep(0.5)
    if success_chance < 0.5:
        raise ValueError(f"{name} 操作失败!")
    return f"{name} 操作成功"

async def main():
    tasks = [
        asyncio.create_task(might_fail_operation("任务A", 0.8)),
        asyncio.create_task(might_fail_operation("任务B", 0.3)),
        asyncio.create_task(might_fail_operation("任务C", 0.9))
    ]
    
    # 使用return_exceptions=True让gather不因单个异常而停止
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务{i}出错: {result}")
        else:
            print(f"任务{i}成功: {result}")

asyncio.run(main())


最佳实践

  1. 选择合适的场景:asyncio 最适合 I/O 密集型任务,如网络请求、文件操作等。

  2. 避免阻塞操作:不要在异步函数中使用 time.sleep() 这样的阻塞调用。

  3. 合理控制并发数量:使用信号量限制同时运行的任务数量:

import asyncio

async def limited_operation(semaphore, name):
    async with semaphore:
        print(f"{name} 开始执行")
        await asyncio.sleep(1)
        print(f"{name} 完成")
        return f"{name} 的结果"

async def main():
    # 限制同时只能有2个操作运行
    semaphore = asyncio.Semaphore(2)
    
    tasks = []
    for i in range(6):
        task = asyncio.create_task(limited_operation(semaphore, f"任务{i}"))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    print("所有任务完成")

asyncio.run(main())
  1. 及时清理资源:使用 async with 语句确保资源被正确释放。


常用类、方法和函数

1. 核心函数

方法/函数说明示例
asyncio.run(coro)运行异步主函数(Python 3.7+)asyncio.run(main())
asyncio.create_task(coro)创建任务并加入事件循环task = asyncio.create_task(fetch_data())
asyncio.gather(*coros)并发运行多个协程await asyncio.gather(task1, task2)
asyncio.sleep(delay)异步等待(非阻塞)await asyncio.sleep(1)
asyncio.wait(coros)控制任务完成方式done, pending = await asyncio.wait([task1, task2])

2. 事件循环(Event Loop)

方法说明示例
loop.run_until_complete(future)运行直到任务完成loop.run_until_complete(main())
loop.run_forever()永久运行事件循环loop.run_forever()
loop.stop()停止事件循环loop.stop()
loop.close()关闭事件循环loop.close()
loop.call_soon(callback)安排回调函数立即执行loop.call_soon(print, "Hello")
loop.call_later(delay, callback)延迟执行回调loop.call_later(5, callback)

3. 协程(Coroutine)与任务(Task)

方法/装饰器说明示例
@asyncio.coroutine协程装饰器(旧版,Python 3.4-3.7)@asyncio.coroutine
def old_coro():
async def定义协程(Python 3.5+)async def fetch():
task.cancel()取消任务task.cancel()
task.done()检查任务是否完成if task.done():
task.result()获取任务结果(需任务完成)data = task.result()

4. 同步原语(类似threading)

说明示例
asyncio.Lock()异步互斥锁lock = asyncio.Lock()
async with lock:
asyncio.Event()事件通知event = asyncio.Event()
await event.wait()
asyncio.Queue()异步队列queue = asyncio.Queue()
await queue.put(item)
asyncio.Semaphore()信号量sem = asyncio.Semaphore(5)
async with sem:

5. 网络与子进程

方法/类说明示例
asyncio.open_connection()建立TCP连接reader, writer = await asyncio.open_connection('host', 80)
asyncio.start_server()创建TCP服务器server = await asyncio.start_server(handle, '0.0.0.0', 8888)
asyncio.create_subprocess_exec()创建子进程proc = await asyncio.create_subprocess_exec('ls')

6. 实用工具

方法说明示例
asyncio.current_task()获取当前任务task = asyncio.current_task()
asyncio.all_tasks()获取所有任务tasks = asyncio.all_tasks()
asyncio.shield(coro)保护任务不被取消await asyncio.shield(critical_task)
asyncio.wait_for(coro, timeout)带超时的等待try: await asyncio.wait_for(task, 5)


总结

asyncio 是 Python 中强大的异步编程工具,特别适合处理大量的 I/O 操作。通过使用 async/await 语法,我们可以编写出既高效又易于理解的并发代码。

记住这些要点:

  • 使用 async def 定义协程

  • 使用 await 来等待异步操作

  • 使用 asyncio.run() 运行主协程

  • 使用 asyncio.create_task() 创建并发任务

  • 使用 asyncio.gather() 等待多个任务完成

fly63 网站提供了更多详细的异步编程教程和实际案例,帮助你更好地掌握这个强大的工具。开始尝试在自己的项目中使用 asyncio,你会发现它能让你的程序运行得更快、更高效。

本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!

链接: https://fly63.com/course/36_2147

目录选择