Python asyncio 异步编程模式深度解析

从事件循环到协程调度,全面解析 asyncio 的设计哲学与最佳实践。

引言

Python 的 asyncio 模块自 3.4 版本引入以来,已经成为高并发网络编程的事实标准。但许多开发者对 asyncio 的理解停留在 async/await 语法层面,对其底层机制和高级用法知之甚少。本文将深入 asyncio 的核心,从事件循环到协程调度,全面解析其设计哲学与最佳实践。

事件循环:一切的开始

事件循环是 asyncio 的心脏。理解事件循环的工作机制,是掌握 asyncio 的第一步。

import asyncio

async def main():
    loop = asyncio.get_running_loop()
    print(f"当前事件循环: {loop}")

asyncio.run(main())

事件循环的核心职责:

  1. 调度协程:管理就绪协程的执行顺序
  2. 处理回调:执行延迟回调和定时任务
  3. 管理文件描述符:监听 socket 的可读/可写状态
  4. 跨线程通信:通过 call_soon_threadsafe 实现线程安全调用

协程与任务

协程的本质

async def 定义的函数返回一个协程对象,它本身不会执行,需要被事件循环调度。

async def greeting(name):
    await asyncio.sleep(1)
    return f"Hello, {name}!"

coro = greeting("Python")
result = asyncio.run(coro)

任务的创建与取消

任务(Task)是对协程的包装,使其可以在事件循环中并发执行。

async def worker(name, delay):
    await asyncio.sleep(delay)
    return f"{name} 完成"

async def main():
    task1 = asyncio.create_task(worker("A", 2))
    task2 = asyncio.create_task(worker("B", 1))
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

优雅的任务取消

async def cancellable_task():
    try:
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("收到取消信号,正在清理...")
        raise

async def main():
    task = asyncio.create_task(cancellable_task())
    await asyncio.sleep(3)
    task.cancel()

asyncio.run(main())

同步原语

Lock

lock = asyncio.Lock()

async def increment():
    async with lock:
        current = counter
        await asyncio.sleep(0.1)
        counter = current + 1

Semaphore

限制并发数量的利器:

semaphore = asyncio.Semaphore(5)

async def fetch(url, session):
    async with semaphore:
        async with session.get(url) as resp:
            return await resp.text()

最佳实践

1. 避免阻塞事件循环

# 错误
time.sleep(5)  # 阻塞整个事件循环!

# 正确
await loop.run_in_executor(None, blocking_func)

2. 合理使用 gather 和 wait

# gather: 等待全部完成
tasks = [asyncio.create_task(some_io(i)) for i in range(10)]
results = await asyncio.gather(*tasks)

# wait: 更灵活的控制
done, pending = await asyncio.wait(
    tasks, return_when=asyncio.FIRST_COMPLETED
)

3. 上下文变量传递

from contextvars import ContextVar
request_id = ContextVar('request_id', default='unknown')

async def handle_request(req_id):
    request_id.set(req_id)
    # 后续代码中 request_id.get() 始终正确

总结

asyncio 是 Python 生态中最强大的并发工具之一。掌握事件循环、协程调度、同步原语和最佳实践,能让你构建出高性能、可维护的异步应用。记住核心原则:不要让事件循环等待,让它一直在调度。

← 回到文章列表