Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

Python Asyncio高级模式 - 从入门到精通的异步编程指南

1. 异步编程的演进与Asyncio

Python的异步编程经历了从yieldyield fromasync/await的演进过程。Python 3.5引入的async/await语法和asyncio库,标志着Python异步编程进入了成熟阶段。

1.1 为什么需要Asyncio?

在高并发场景下,传统的同步编程模型会导致大量的CPU等待时间,而异步编程可以让CPU在等待IO操作时处理其他任务,从而提高系统的吞吐量。

2. 异步上下文管理器

异步上下文管理器允许我们在异步代码中使用async with语句,用于资源的自动管理。

2.1 基本实现

python
import asyncio

class AsyncContextManager:
    async def __aenter__(self):
        print("进入异步上下文")
        await asyncio.sleep(1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("退出异步上下文")
        await asyncio.sleep(1)

async def main():
    async with AsyncContextManager() as ctx:
        print("使用异步上下文")

asyncio.run(main())

2.2 实用案例:异步数据库连接池

python
import asyncio
import aiomysql

class AsyncMySQLPool:
    def __init__(self, **kwargs):
        self.pool = None
        self.config = kwargs
    
    async def __aenter__(self):
        self.pool = await aiomysql.create_pool(**self.config)
        return self.pool
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.pool.close()
        await self.pool.wait_closed()

# 使用示例
async def query_db():
    config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'root',
        'password': 'password',
        'db': 'test',
        'minsize': 5,
        'maxsize': 20
    }
    
    async with AsyncMySQLPool(**config) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                result = await cur.fetchone()
                print(f"查询结果: {result}")

asyncio.run(query_db())

3. 异步迭代器

异步迭代器允许我们在异步代码中使用async for语句,用于异步遍历数据。

3.1 基本实现

python
class AsyncIterator:
    def __init__(self, start, end):
        self.start = start
        self.end = end
    
    def __aiter__(self):
        self.current = self.start
        return self
    
    async def __anext__(self):
        if self.current >= self.end:
            raise StopAsyncIteration
        
        value = self.current
        self.current += 1
        await asyncio.sleep(0.5)  # 模拟异步操作
        return value

async def main():
    async for num in AsyncIterator(0, 5):
        print(f"迭代值: {num}")

asyncio.run(main())

3.2 实用案例:异步数据生成器

python
async def async_range(start, end, delay=0.1):
    current = start
    while current < end:
        yield current
        current += 1
        await asyncio.sleep(delay)

async def main():
    async for num in async_range(0, 10, 0.2):
        print(f"异步生成: {num}")

asyncio.run(main())

4. 任务组(Task Groups)

Python 3.11引入了任务组(Task Groups),提供了一种更安全、更简洁的方式来管理多个并发任务。

4.1 基本使用

python
import asyncio

async def task(name, duration):
    print(f"任务 {name} 开始")
    await asyncio.sleep(duration)
    print(f"任务 {name} 完成")
    return f"结果 {name}"

async def main():
    async with asyncio.TaskGroup() as tg:
        # 创建多个任务
        task1 = tg.create_task(task("A", 1))
        task2 = tg.create_task(task("B", 2))
        task3 = tg.create_task(task("C", 1.5))
    
    # 任务组完成后,所有任务都已完成
    print(f"任务1结果: {task1.result()}")
    print(f"任务2结果: {task2.result()}")
    print(f"任务3结果: {task3.result()}")

asyncio.run(main())

4.2 错误处理

python
async def failing_task():
    print("失败任务开始")
    await asyncio.sleep(0.5)
    raise ValueError("任务失败")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task("成功", 1))
            tg.create_task(failing_task())
            tg.create_task(task("另一个成功", 1.5))
    except Exception as e:
        print(f"捕获到异常: {e}")
    
    print("主程序继续执行")

asyncio.run(main())

5. 异步队列(Asyncio Queue)

异步队列用于在异步任务之间安全地传递数据,是实现生产者-消费者模式的理想选择。

5.1 基本使用

python
import asyncio

async def producer(queue, n):
    for i in range(n):
        item = f"项目 {i}"
        await queue.put(item)
        print(f"生产者: 放入 {item}, 队列大小: {queue.qsize()}")
        await asyncio.sleep(0.2)
    
    await queue.put(None)  # 发送结束信号
    print("生产者: 完成")

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            await queue.put(None)  # 传递结束信号
            print("消费者: 收到结束信号")
            break
        
        print(f"消费者: 处理 {item}, 队列大小: {queue.qsize()}")
        await asyncio.sleep(0.5)  # 模拟处理时间
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)
    
    # 创建任务
    producer_task = asyncio.create_task(producer(queue, 10))
    consumer_task = asyncio.create_task(consumer(queue))
    
    # 等待完成
    await producer_task
    await queue.join()  # 等待队列清空
    await consumer_task
    
    print("所有任务完成")

asyncio.run(main())

5.2 多生产者多消费者

python
async def main():
    queue = asyncio.Queue(maxsize=10)
    
    # 创建多个生产者和消费者
    producers = [asyncio.create_task(producer(queue, 5)) for _ in range(3)]
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(2)]
    
    # 等待所有生产者完成
    await asyncio.gather(*producers)
    
    # 发送结束信号
    await queue.put(None)
    
    # 等待所有消费者完成
    await asyncio.gather(*consumers)
    
    print("所有任务完成")

asyncio.run(main())

6. 异步锁与信号量

在并发编程中,锁和信号量用于保护共享资源,防止竞态条件。

6.1 异步锁(Lock)

python
import asyncio

async def worker(name, lock, shared_counter):
    for _ in range(5):
        # 获取锁
        async with lock:
            # 临界区
            shared_counter[0] += 1
            print(f"{name}: 计数器 = {shared_counter[0]}")
        
        await asyncio.sleep(0.1)

async def main():
    shared_counter = [0]
    lock = asyncio.Lock()
    
    # 创建多个工作者
    workers = [asyncio.create_task(worker(f"工作者{i}", lock, shared_counter)) for i in range(3)]
    
    await asyncio.gather(*workers)
    
    print(f"最终计数器值: {shared_counter[0]}")

asyncio.run(main())

6.2 异步信号量(Semaphore)

python
import asyncio

async def worker(name, semaphore, resource_id):
    async with semaphore:
        print(f"{name}: 开始使用资源 {resource_id}")
        await asyncio.sleep(1)  # 模拟资源使用
        print(f"{name}: 释放资源 {resource_id}")

async def main():
    # 限制同时只能使用2个资源
    semaphore = asyncio.Semaphore(2)
    
    # 创建多个工作者
    workers = [asyncio.create_task(worker(f"工作者{i}", semaphore, i % 2)) for i in range(5)]
    
    await asyncio.gather(*workers)
    
    print("所有任务完成")

asyncio.run(main())

7. 异步超时处理

使用asyncio.timeout上下文管理器可以为异步操作设置超时时间。

7.1 基本使用

python
import asyncio

async def slow_operation():
    print("开始慢速操作")
    await asyncio.sleep(3)
    print("慢速操作完成")
    return "成功"

async def main():
    try:
        async with asyncio.timeout(2):
            result = await slow_operation()
            print(f"结果: {result}")
    except asyncio.TimeoutError:
        print("操作超时")
    
    print("主程序继续执行")

asyncio.run(main())

7.2 超时装饰器

python
import asyncio
from functools import wraps

def async_timeout(seconds):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                async with asyncio.timeout(seconds):
                    return await func(*args, **kwargs)
            except asyncio.TimeoutError:
                raise TimeoutError(f"函数 {func.__name__} 执行超时")
        return wrapper
    return decorator

@async_timeout(2)
async def slow_function():
    await asyncio.sleep(3)
    return "完成"

async def main():
    try:
        result = await slow_function()
        print(f"结果: {result}")
    except TimeoutError as e:
        print(f"捕获到超时: {e}")

asyncio.run(main())

8. 异步事件(Event)

异步事件用于在任务之间发送信号,实现同步。

python
import asyncio

async def waiter(event, name):
    print(f"{name}: 等待事件")
    await event.wait()
    print(f"{name}: 事件已触发")

async def setter(event):
    print("设置者: 准备触发事件")
    await asyncio.sleep(2)
    print("设置者: 触发事件")
    event.set()
    
    # 重置事件
    await asyncio.sleep(1)
    print("设置者: 重置事件")
    event.clear()

async def main():
    event = asyncio.Event()
    
    # 创建等待者任务
    waiter1 = asyncio.create_task(waiter(event, "等待者1"))
    waiter2 = asyncio.create_task(waiter(event, "等待者2"))
    
    # 创建设置者任务
    setter_task = asyncio.create_task(setter(event))
    
    # 等待所有任务完成
    await asyncio.gather(waiter1, waiter2, setter_task)
    
    print("所有任务完成")

asyncio.run(main())

9. 异步条件(Condition)

异步条件结合了锁和事件的功能,允许任务等待特定条件满足。

python
import asyncio

async def consumer(condition, queue):
    async with condition:
        while queue.empty():
            print("消费者: 等待生产者生产")
            await condition.wait()
        
        item = queue.get()
        print(f"消费者: 消费 {item}")

async def producer(condition, queue):
    for i in range(3):
        await asyncio.sleep(1)
        queue.put(f"产品 {i}")
        print(f"生产者: 生产产品 {i}")
        
        async with condition:
            condition.notify_all()

async def main():
    queue = asyncio.Queue()
    condition = asyncio.Condition()
    
    # 创建任务
    consumer_task = asyncio.create_task(consumer(condition, queue))
    producer_task = asyncio.create_task(producer(condition, queue))
    
    await asyncio.gather(consumer_task, producer_task)
    
    print("所有任务完成")

asyncio.run(main())

10. 总结与最佳实践

10.1 异步编程最佳实践

  1. 避免阻塞调用:在异步代码中避免使用阻塞IO操作,如time.sleep()requests.get()等,应使用对应的异步版本。

  2. 合理使用任务组:Python 3.11+推荐使用TaskGroup管理多个任务,更安全可靠。

  3. 使用异步上下文管理器:自动管理资源,避免资源泄漏。

  4. 合理设置超时:为网络请求等外部操作设置超时,避免无限等待。

  5. 使用异步队列实现生产者-消费者:安全高效地在任务间传递数据。

  6. 避免过度并发:使用信号量限制并发数,防止系统资源耗尽。

  7. 合理设计异步API:为库和服务设计良好的异步接口。

  8. 使用类型提示:提高代码的可读性和可维护性。

10.2 性能优化建议

  1. 减少任务创建开销:对于频繁执行的小任务,考虑复用任务或使用其他并发模型。

  2. 合理设置队列大小:避免队列过大导致内存占用过高。

  3. 使用异步IO库:选择成熟的异步IO库,如aiohttpaiomysqlasyncpg等。

  4. 避免不必要的上下文切换:减少锁的持有时间,优化临界区代码。

  5. 使用Profiler分析性能:使用asyncio.run_coroutine_threadsafe()cProfile分析异步代码性能。

11. 结语

Python Asyncio提供了强大的异步编程能力,通过掌握其高级模式,可以构建高性能、高并发的应用程序。随着Python 3.11+引入的新特性,如任务组,异步编程变得更加安全和简洁。

在实际开发中,应根据具体需求选择合适的异步模式,结合最佳实践,编写出高效、可靠的异步代码。异步编程是现代Python开发的重要组成部分,掌握它将使你在构建高性能应用时如虎添翼。

希望本文对你理解和使用Python Asyncio有所帮助,祝你在异步编程的道路上越走越远!

上次更新于: