安全建议
1. 永远不要在代码中硬编码API密钥
2. 使用环境变量或配置文件存储密钥
3. 定期轮换API密钥
4. 使用HTTPS进行所有API调用
5. 实施请求频率限制
6. 记录所有API调用日志
7. 及时撤销不用的API密钥
数据采集实战
API调用是量化交易获取数据的核心方式,通过标准化的接口访问金融数据服务。 本节将详细介绍API调用的各种认证机制和最佳实践。
import requests
import os
# 从环境变量获取API Key
API_KEY = os.getenv('API_KEY')
# 方式1:在URL参数中传递
response = requests.get(
'https://api.example.com/stocks/AAPL',
params={'apikey': API_KEY}
)
# 方式2:在请求头中传递
headers = {
'X-API-Key': API_KEY,
'Content-Type': 'application/json'
}
response = requests.get(
'https://api.example.com/stocks/AAPL',
headers=headers
)
# 方式3:使用Authorization头
headers = {
'Authorization': f'Bearer {API_KEY}'
}
response = requests.get(url, headers=headers)
from requests_oauthlib import OAuth2Session
# OAuth2配置
client_id = 'your_client_id'
client_secret = 'your_client_secret'
authorization_url = 'https://api.example.com/oauth/authorize'
token_url = 'https://api.example.com/oauth/token'
redirect_uri = 'https://yourapp.com/callback'
# 创建OAuth2会话
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri)
# 获取授权URL
authorization_url, state = oauth.authorization_url(
authorization_url
)
print(f"请访问: {authorization_url}")
# 获取访问令牌
token = oauth.fetch_token(
token_url,
client_secret=client_secret,
authorization_response=input("输入完整回调URL: ")
)
# 使用令牌访问API
response = oauth.get('https://api.example.com/api/data')
import hmac
import hashlib
import time
def generate_hmac_signature(api_key, api_secret, method, path, params=None):
"""生成HMAC签名"""
if params is None:
params = {}
# 构造签名字符串
timestamp = str(int(time.time()))
query_string = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
sign_string = f"{method}\n{path}\n{query_string}\n{timestamp}"
# 生成签名
signature = hmac.new(
api_secret.encode('utf-8'),
sign_string.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature, timestamp
# 使用HMAC签名
api_key = 'your_api_key'
api_secret = 'your_api_secret'
method = 'GET'
path = '/api/v1/ticker'
params = {'symbol': 'BTCUSDT'}
signature, timestamp = generate_hmac_signature(
api_key, api_secret, method, path, params
)
headers = {
'X-MBX-APIKEY': api_key,
'X-TIMESTAMP': timestamp,
'X-SIGNATURE': signature
}
response = requests.get(
f"https://api.example.com{path}",
headers=headers,
params=params
)
import requests
import json
from typing import Dict, Any, Optional
import time
class APIClient:
"""API客户端封装"""
def __init__(
self,
base_url: str,
api_key: Optional[str] = None,
timeout: int = 30,
retry_count: int = 3
):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.retry_count = retry_count
self.session = requests.Session()
def _get_headers(self) -> Dict[str, str]:
"""获取请求头"""
headers = {
'Content-Type': 'application/json',
'User-Agent': 'QuantTrading/1.0'
}
if self.api_key:
headers['X-API-Key'] = self.api_key
return headers
def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
**kwargs
) -> Dict[str, Any]:
"""发送请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(self.retry_count):
try:
response = self.session.request(
method=method,
url=url,
headers=self._get_headers(),
params=params,
json=data,
timeout=self.timeout,
**kwargs
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == self.retry_count - 1:
raise
time.sleep(2 ** attempt) # 指数退避
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict:
"""GET请求"""
return self._request('GET', endpoint, params=params)
def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict:
"""POST请求"""
return self._request('POST', endpoint, data=data)
def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict:
"""PUT请求"""
return self._request('PUT', endpoint, data=data)
def delete(self, endpoint: str) -> Dict:
"""DELETE请求"""
return self._request('DELETE', endpoint)
class PaginatedAPIClient(APIClient):
"""支持分页的API客户端"""
def get_all_pages(
self,
endpoint: str,
params: Optional[Dict] = None,
page_size: int = 100
) -> list:
"""获取所有分页数据"""
if params is None:
params = {}
all_data = []
page = 1
while True:
params['page'] = page
params['page_size'] = page_size
response = self.get(endpoint, params)
if not response.get('data'):
break
all_data.extend(response['data'])
# 检查是否还有下一页
if len(response['data']) < page_size:
break
page += 1
return all_data
def fetch_historical_data(
self,
symbol: str,
start_date: str,
end_date: str
) -> list:
"""获取历史数据"""
all_data = []
current_date = start_date
while current_date <= end_date:
response = self.get(
f'/stocks/{symbol}/history',
params={'date': current_date}
)
all_data.extend(response.get('data', []))
# 移动到下一天
from datetime import datetime, timedelta
current_date = (datetime.strptime(current_date, '%Y-%m-%d') +
timedelta(days=1)).strftime('%Y-%m-%d')
return all_data
import requests
from requests.exceptions import RequestException
class APIError(Exception):
"""API错误基类"""
pass
class RateLimitError(APIError):
"""速率限制错误"""
pass
class AuthenticationError(APIError):
"""认证错误"""
pass
class APIClientWithErrorHandling(APIClient):
"""带错误处理的API客户端"""
def _handle_error(self, response: requests.Response):
"""处理API错误"""
status_code = response.status_code
if status_code == 401:
raise AuthenticationError("API密钥无效或已过期")
elif status_code == 429:
raise RateLimitError("请求频率超限")
elif status_code >= 500:
raise APIError(f"服务器错误: {status_code}")
elif status_code >= 400:
error_data = response.json()
raise APIError(error_data.get('message', '请求失败'))
def _request(self, method, endpoint, params=None, data=None, **kwargs):
"""带错误处理的请求"""
try:
response = self.session.request(
method=method,
url=f"{self.base_url}/{endpoint.lstrip('/')}",
headers=self._get_headers(),
params=params,
json=data,
timeout=self.timeout,
**kwargs
)
# 检查HTTP错误
try:
response.raise_for_status()
except requests.HTTPError as e:
self._handle_error(response)
return response.json()
except RequestException as e:
raise APIError(f"网络请求失败: {str(e)}")
import time
from functools import wraps
def retry_on_failure(
max_retries=3,
backoff_factor=2,
exceptions=(Exception,)
):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = backoff_factor ** attempt
print(f"重试 {attempt + 1}/{max_retries}, "
f"等待 {wait_time} 秒...")
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
# 使用重试装饰器
@retry_on_failure(max_retries=3, backoff_factor=2)
def fetch_with_retry(client, endpoint):
"""带重试的数据获取"""
return client.get(endpoint)
def parse_response(response):
"""解析API响应"""
if 'data' in response:
return response['data']
elif 'items' in response:
return response['items']
elif 'results' in response:
return response['results']
else:
return response
# 处理嵌套响应
def flatten_response(data):
"""展平嵌套的响应数据"""
flattened = []
for item in data:
flat_item = {}
for key, value in item.items():
if isinstance(value, dict):
for sub_key, sub_value in value.items():
flat_item[f"{key}_{sub_key}"] = sub_value
else:
flat_item[key] = value
flattened.append(flat_item)
return flattened
# 转换为DataFrame
import pandas as pd
def response_to_dataframe(response):
"""将API响应转换为DataFrame"""
data = parse_response(response)
flattened = flatten_response(data)
return pd.DataFrame(flattened)
1. 永远不要在代码中硬编码API密钥
2. 使用环境变量或配置文件存储密钥
3. 定期轮换API密钥
4. 使用HTTPS进行所有API调用
5. 实施请求频率限制
6. 记录所有API调用日志
7. 及时撤销不用的API密钥