混合架构设计
在实际系统中,建议采用混合架构:使用历史数据构建和优化策略模型,实时数据驱动交易执行。 同时,实时数据需要定期回写到历史数据中,形成完整的数据闭环。这样可以兼顾研究深度和执行效率。
金融市场数据基础
历史数据用于策略回测和研究,实时数据用于实盘交易和监控。 两者在数据来源、处理方式、技术架构和性能要求上都有显著差异。 本节将全面对比这两种数据类型,并介绍各自的实现方案。
| 对比维度 | 历史数据 | 实时数据 |
|---|---|---|
| 数据来源 | 文件、数据库、API批量 | WebSocket、FIX协议、消息队列 |
| 处理模式 | 批处理(Batch) | 流处理(Stream) |
| 存储方式 | 时序数据库、文件系统 | 内存数据库、缓存 |
| 延迟要求 | >秒级到分钟级毫秒级到微秒级 | |
| 典型工具 | Pandas、Hadoop、Spark | Kafka、Storm、Flink |
| 容错机制 | 重试、检查点 | 消息确认、重放 |
import websocket
import json
import threading
class RealTimeDataHandler:
def __init__(self, on_data_callback):
self.ws = None
self.on_data = on_data_callback
self.reconnect_delay = 1
def on_message(self, ws, message):
try:
data = json.loads(message)
self.on_data(data)
except Exception as e:
print(f"解析错误: {e}")
def on_error(self, ws, error):
print(f"WebSocket错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("连接关闭,尝试重连...")
self.reconnect()
def on_open(self, ws):
print("WebSocket连接已建立")
# 订阅数据
ws.send(json.dumps({
"action": "subscribe",
"symbol": "000001.SZ"
}))
def connect(self, url):
self.ws = websocket.WebSocketApp(
url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
self.ws.run_forever()
def reconnect(self):
threading.Timer(self.reconnect_delay, self.connect).start()
在实际系统中,建议采用混合架构:使用历史数据构建和优化策略模型,实时数据驱动交易执行。 同时,实时数据需要定期回写到历史数据中,形成完整的数据闭环。这样可以兼顾研究深度和执行效率。