🎯 学习目标

  • 掌握RESTful API的基本概念和调用方法
  • 学会各种API认证机制的实现
  • 掌握API请求的参数处理和响应解析
  • 了解API调用的最佳实践和错误处理
API调用与认证

API调用与认证

API调用是量化交易获取数据的核心方式,通过标准化的接口访问金融数据服务。 本节将详细介绍API调用的各种认证机制和最佳实践。

🔐 API认证机制

API Key认证

import requests
import os

# 从环境变量获取API Key
API_KEY = os.getenv('API_KEY')

# 方式1:在URL参数中传递
response = requests.get(
    'https://api.example.com/stocks/AAPL',
    params={'apikey': API_KEY}
)

# 方式2:在请求头中传递
headers = {
    'X-API-Key': API_KEY,
    'Content-Type': 'application/json'
}
response = requests.get(
    'https://api.example.com/stocks/AAPL',
    headers=headers
)

# 方式3:使用Authorization头
headers = {
    'Authorization': f'Bearer {API_KEY}'
}
response = requests.get(url, headers=headers)

OAuth 2.0认证

from requests_oauthlib import OAuth2Session

# OAuth2配置
client_id = 'your_client_id'
client_secret = 'your_client_secret'
authorization_url = 'https://api.example.com/oauth/authorize'
token_url = 'https://api.example.com/oauth/token'
redirect_uri = 'https://yourapp.com/callback'

# 创建OAuth2会话
oauth = OAuth2Session(client_id, redirect_uri=redirect_uri)

# 获取授权URL
authorization_url, state = oauth.authorization_url(
    authorization_url
)
print(f"请访问: {authorization_url}")

# 获取访问令牌
token = oauth.fetch_token(
    token_url,
    client_secret=client_secret,
    authorization_response=input("输入完整回调URL: ")
)

# 使用令牌访问API
response = oauth.get('https://api.example.com/api/data')

HMAC签名认证

import hmac
import hashlib
import time

def generate_hmac_signature(api_key, api_secret, method, path, params=None):
    """生成HMAC签名"""
    if params is None:
        params = {}

    # 构造签名字符串
    timestamp = str(int(time.time()))
    query_string = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
    sign_string = f"{method}\n{path}\n{query_string}\n{timestamp}"

    # 生成签名
    signature = hmac.new(
        api_secret.encode('utf-8'),
        sign_string.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()

    return signature, timestamp

# 使用HMAC签名
api_key = 'your_api_key'
api_secret = 'your_api_secret'

method = 'GET'
path = '/api/v1/ticker'
params = {'symbol': 'BTCUSDT'}

signature, timestamp = generate_hmac_signature(
    api_key, api_secret, method, path, params
)

headers = {
    'X-MBX-APIKEY': api_key,
    'X-TIMESTAMP': timestamp,
    'X-SIGNATURE': signature
}

response = requests.get(
    f"https://api.example.com{path}",
    headers=headers,
    params=params
)

📡 API请求封装

API客户端封装

import requests
import json
from typing import Dict, Any, Optional
import time

class APIClient:
    """API客户端封装"""

    def __init__(
        self,
        base_url: str,
        api_key: Optional[str] = None,
        timeout: int = 30,
        retry_count: int = 3
    ):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.timeout = timeout
        self.retry_count = retry_count
        self.session = requests.Session()

    def _get_headers(self) -> Dict[str, str]:
        """获取请求头"""
        headers = {
            'Content-Type': 'application/json',
            'User-Agent': 'QuantTrading/1.0'
        }
        if self.api_key:
            headers['X-API-Key'] = self.api_key
        return headers

    def _request(
        self,
        method: str,
        endpoint: str,
        params: Optional[Dict] = None,
        data: Optional[Dict] = None,
        **kwargs
    ) -> Dict[str, Any]:
        """发送请求"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"

        for attempt in range(self.retry_count):
            try:
                response = self.session.request(
                    method=method,
                    url=url,
                    headers=self._get_headers(),
                    params=params,
                    json=data,
                    timeout=self.timeout,
                    **kwargs
                )
                response.raise_for_status()
                return response.json()

            except requests.exceptions.RequestException as e:
                if attempt == self.retry_count - 1:
                    raise
                time.sleep(2 ** attempt)  # 指数退避

    def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict:
        """GET请求"""
        return self._request('GET', endpoint, params=params)

    def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict:
        """POST请求"""
        return self._request('POST', endpoint, data=data)

    def put(self, endpoint: str, data: Optional[Dict] = None) -> Dict:
        """PUT请求"""
        return self._request('PUT', endpoint, data=data)

    def delete(self, endpoint: str) -> Dict:
        """DELETE请求"""
        return self._request('DELETE', endpoint)

分页处理

class PaginatedAPIClient(APIClient):
    """支持分页的API客户端"""

    def get_all_pages(
        self,
        endpoint: str,
        params: Optional[Dict] = None,
        page_size: int = 100
    ) -> list:
        """获取所有分页数据"""
        if params is None:
            params = {}

        all_data = []
        page = 1

        while True:
            params['page'] = page
            params['page_size'] = page_size

            response = self.get(endpoint, params)

            if not response.get('data'):
                break

            all_data.extend(response['data'])

            # 检查是否还有下一页
            if len(response['data']) < page_size:
                break

            page += 1

        return all_data

    def fetch_historical_data(
        self,
        symbol: str,
        start_date: str,
        end_date: str
    ) -> list:
        """获取历史数据"""
        all_data = []
        current_date = start_date

        while current_date <= end_date:
            response = self.get(
                f'/stocks/{symbol}/history',
                params={'date': current_date}
            )

            all_data.extend(response.get('data', []))

            # 移动到下一天
            from datetime import datetime, timedelta
            current_date = (datetime.strptime(current_date, '%Y-%m-%d') +
                          timedelta(days=1)).strftime('%Y-%m-%d')

        return all_data

⚠️ 错误处理与重试

错误处理

import requests
from requests.exceptions import RequestException

class APIError(Exception):
    """API错误基类"""
    pass

class RateLimitError(APIError):
    """速率限制错误"""
    pass

class AuthenticationError(APIError):
    """认证错误"""
    pass

class APIClientWithErrorHandling(APIClient):
    """带错误处理的API客户端"""

    def _handle_error(self, response: requests.Response):
        """处理API错误"""
        status_code = response.status_code

        if status_code == 401:
            raise AuthenticationError("API密钥无效或已过期")
        elif status_code == 429:
            raise RateLimitError("请求频率超限")
        elif status_code >= 500:
            raise APIError(f"服务器错误: {status_code}")
        elif status_code >= 400:
            error_data = response.json()
            raise APIError(error_data.get('message', '请求失败'))

    def _request(self, method, endpoint, params=None, data=None, **kwargs):
        """带错误处理的请求"""
        try:
            response = self.session.request(
                method=method,
                url=f"{self.base_url}/{endpoint.lstrip('/')}",
                headers=self._get_headers(),
                params=params,
                json=data,
                timeout=self.timeout,
                **kwargs
            )

            # 检查HTTP错误
            try:
                response.raise_for_status()
            except requests.HTTPError as e:
                self._handle_error(response)

            return response.json()

        except RequestException as e:
            raise APIError(f"网络请求失败: {str(e)}")

智能重试

import time
from functools import wraps

def retry_on_failure(
    max_retries=3,
    backoff_factor=2,
    exceptions=(Exception,)
):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        wait_time = backoff_factor ** attempt
                        print(f"重试 {attempt + 1}/{max_retries}, "
                              f"等待 {wait_time} 秒...")
                        time.sleep(wait_time)

            raise last_exception
        return wrapper
    return decorator

# 使用重试装饰器
@retry_on_failure(max_retries=3, backoff_factor=2)
def fetch_with_retry(client, endpoint):
    """带重试的数据获取"""
    return client.get(endpoint)

📊 响应处理

JSON响应处理

def parse_response(response):
    """解析API响应"""
    if 'data' in response:
        return response['data']
    elif 'items' in response:
        return response['items']
    elif 'results' in response:
        return response['results']
    else:
        return response

# 处理嵌套响应
def flatten_response(data):
    """展平嵌套的响应数据"""
    flattened = []

    for item in data:
        flat_item = {}
        for key, value in item.items():
            if isinstance(value, dict):
                for sub_key, sub_value in value.items():
                    flat_item[f"{key}_{sub_key}"] = sub_value
            else:
                flat_item[key] = value
        flattened.append(flat_item)

    return flattened

# 转换为DataFrame
import pandas as pd

def response_to_dataframe(response):
    """将API响应转换为DataFrame"""
    data = parse_response(response)
    flattened = flatten_response(data)
    return pd.DataFrame(flattened)
⚠️
安全建议

1. 永远不要在代码中硬编码API密钥
2. 使用环境变量或配置文件存储密钥
3. 定期轮换API密钥
4. 使用HTTPS进行所有API调用
5. 实施请求频率限制
6. 记录所有API调用日志
7. 及时撤销不用的API密钥

📝 本节小结

  • • 掌握了各种API认证机制
  • • 学会了API请求的封装方法
  • • 理解了错误处理和重试策略
  • • 掌握了响应数据的处理技巧