🎯 学习目标

  • 理解同步与异步的区别
  • 掌握async/await语法
  • 学会使用asyncio库
  • 了解异步在AI API调用中的应用

⏱️ 同步 vs 异步

同步执行

任务按顺序执行,一个完成后再执行下一个

# 同步请求API import requests import time def fetch_data(urls): results = [] for url in urls: response = requests.get(url) results.append(response.json()) return results # 如果每个请求1秒,10个URL需要10秒
效率低

异步执行

多个任务同时进行,不阻塞等待

# 异步请求API import aiohttp import asyncio async def fetch_data(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_one(session, url) for url in urls] return await asyncio.gather(*tasks) async def fetch_one(session, url): async with session.get(url) as response: return await response.json() # 10个URL可能只需1-2秒
效率高

🔄 async/await语法

import asyncio # 定义异步函数 async def say_hello(name, delay): print(f"Hello {name}!") await asyncio.sleep(delay) # 模拟IO操作 print(f"Goodbye {name}!") return f"Done: {name}" # 运行异步函数 async def main(): # 顺序执行 result1 = await say_hello("Alice", 1) result2 = await say_hello("Bob", 1) # 总耗时: 2秒 # 并发执行 results = await asyncio.gather( say_hello("Alice", 1), say_hello("Bob", 1), say_hello("Charlie", 1) ) # 总耗时: 1秒(并发) print(results) # 运行事件循环 asyncio.run(main())

🌐 异步HTTP请求

调用AI API的最佳实践

import aiohttp import asyncio from typing import List class AsyncOpenAI: def __init__(self, api_key: str): self.api_key = api_key self.base_url = "https://api.openai.com/v1" async def chat_completion( self, session: aiohttp.ClientSession, messages: List[dict] ): async with session.post( f"{self.base_url}/chat/completions", headers={"Authorization": f"Bearer {self.api_key}"}, json={ "model": "gpt-3.5-turbo", "messages": messages } ) as response: return await response.json() async def batch_chat(self, prompts: List[str]): async with aiohttp.ClientSession() as session: tasks = [ self.chat_completion( session, [{"role": "user", "content": prompt}] ) for prompt in prompts ] return await asyncio.gather(*tasks) # 使用 client = AsyncOpenAI("your-api-key") results = asyncio.run(client.batch_chat([ "What is AI?", "Explain machine learning", "What is deep learning?" ]))

⚠️ 异步编程注意事项

⚠️
  • 不要阻塞事件循环:CPU密集型任务应该用多进程
  • 异常处理:使用try/except包装await调用
  • 超时控制:使用asyncio.wait_for设置超时
  • 并发限制:使用asyncio.Semaphore控制并发数
# 并发限制示例 async def limited_concurrency(urls, max_concurrent=5): semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_limit(session, url): async with semaphore: return await fetch_one(session, url) async with aiohttp.ClientSession() as session: tasks = [fetch_with_limit(session, url) for url in urls] return await asyncio.gather(*tasks)

📝 本节小结

  • • 异步编程适合IO密集型任务
  • • async/await是Python原生异步语法
  • • asyncio.gather实现并发执行
  • • AI API调用推荐使用异步方式