- 不要阻塞事件循环:CPU密集型任务应该用多进程
- 异常处理:使用try/except包装await调用
- 超时控制:使用asyncio.wait_for设置超时
- 并发限制:使用asyncio.Semaphore控制并发数
4.4 异步编程基础
高效处理并发任务
🎯 学习目标
- 理解同步与异步的区别
- 掌握async/await语法
- 学会使用asyncio库
- 了解异步在AI API调用中的应用
⏱️ 同步 vs 异步
同步执行
任务按顺序执行,一个完成后再执行下一个
# 同步请求API
import requests
import time
def fetch_data(urls):
results = []
for url in urls:
response = requests.get(url)
results.append(response.json())
return results
# 如果每个请求1秒,10个URL需要10秒
效率低
异步执行
多个任务同时进行,不阻塞等待
# 异步请求API
import aiohttp
import asyncio
async def fetch_data(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_one(session, url):
async with session.get(url) as response:
return await response.json()
# 10个URL可能只需1-2秒
效率高
🔄 async/await语法
import asyncio
# 定义异步函数
async def say_hello(name, delay):
print(f"Hello {name}!")
await asyncio.sleep(delay) # 模拟IO操作
print(f"Goodbye {name}!")
return f"Done: {name}"
# 运行异步函数
async def main():
# 顺序执行
result1 = await say_hello("Alice", 1)
result2 = await say_hello("Bob", 1)
# 总耗时: 2秒
# 并发执行
results = await asyncio.gather(
say_hello("Alice", 1),
say_hello("Bob", 1),
say_hello("Charlie", 1)
)
# 总耗时: 1秒(并发)
print(results)
# 运行事件循环
asyncio.run(main())
🌐 异步HTTP请求
调用AI API的最佳实践
import aiohttp
import asyncio
from typing import List
class AsyncOpenAI:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
async def chat_completion(
self,
session: aiohttp.ClientSession,
messages: List[dict]
):
async with session.post(
f"{self.base_url}/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": "gpt-3.5-turbo",
"messages": messages
}
) as response:
return await response.json()
async def batch_chat(self, prompts: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [
self.chat_completion(
session,
[{"role": "user", "content": prompt}]
)
for prompt in prompts
]
return await asyncio.gather(*tasks)
# 使用
client = AsyncOpenAI("your-api-key")
results = asyncio.run(client.batch_chat([
"What is AI?",
"Explain machine learning",
"What is deep learning?"
]))
⚠️ 异步编程注意事项
⚠️
# 并发限制示例
async def limited_concurrency(urls, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(session, url):
async with semaphore:
return await fetch_one(session, url)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url) for url in urls]
return await asyncio.gather(*tasks)
📝 本节小结
✅
- • 异步编程适合IO密集型任务
- • async/await是Python原生异步语法
- • asyncio.gather实现并发执行
- • AI API调用推荐使用异步方式