性能优化检查清单
✓ 避免在数据操作中使用循环
✓ 优先使用向量化操作
✓ 合理使用广播机制
✓ 选择合适的数据类型
✓ 对字符串列使用category类型
✓ 使用eval和query优化表达式
✓ 预分配内存
✓ 分块处理大数据
✓ 使用高效的数据结构
✓ 定期分析性能瓶颈
Python数据处理基础
性能优化是量化交易数据处理中的关键环节,直接影响系统的响应速度和吞吐量。 本节将系统介绍NumPy和Pandas的各种性能优化技巧。
import time
import pandas as pd
import numpy as np
from line_profiler import LineProfiler
from memory_profiler import profile
# 基本计时
start_time = time.time()
# 你的代码
end_time = time.time()
print(f"执行时间: {end_time - start_time:.4f}秒")
# 使用timeit
import timeit
execution_time = timeit.timeit(
'your_function()',
setup='from __main__ import your_function',
number=100
)
print(f"平均执行时间: {execution_time / 100:.6f}秒")
# 行级性能分析
@profile
def your_function():
# 函数代码
pass
# 运行分析
line_profiler = LineProfiler()
line_profiler.add_function(your_function)
line_profiler.enable_by_count()
your_function()
line_profiler.print_stats()
# DataFrame内存使用
df.info(memory_usage='deep')
df.memory_usage(deep=True)
# 内存使用详情
for dtype in ['float', 'int', 'object', 'bool']:
selected_dtype = df.select_dtypes(include=[dtype])
mean_usage_b = selected_dtype.memory_usage(deep=True).mean()
mean_usage_mb = mean_usage_b / 1024 ** 2
print(f"Average memory usage for {dtype}: {mean_usage_mb:.3f} MB")
# 对象类型内存分析
obj_cols = df.select_dtypes(include=['object']).columns.tolist()
for col in obj_cols:
num_unique_values = len(df[col].unique())
num_total_values = len(df[col])
print(f"{col}: {num_unique_values} unique / {num_total_values} total")
# 避免循环,使用向量化
import numpy as np
# 慢速方式
def slow_way(arr):
result = np.zeros_like(arr)
for i in range(len(arr)):
result[i] = arr[i] * 2 + 1
return result
# 快速方式
def fast_way(arr):
return arr * 2 + 1
# 广播机制
arr1 = np.random.randn(1000, 100)
arr2 = np.random.randn(100)
# 慢速
result_slow = np.zeros((1000, 100))
for i in range(1000):
result_slow[i] = arr1[i] + arr2
# 快速
result_fast = arr1 + arr2 # 广播
# 向量化条件判断
arr = np.random.randn(1000000)
# 慢速
result_slow = np.where(arr > 0, arr, 0)
# 快速(numpy内置)
result_fast = np.maximum(arr, 0)
# 使用连续内存
arr = np.random.randn(10000, 100)
arr_contiguous = np.ascontiguousarray(arr)
# 预分配内存
result = np.zeros(10000000) # 预先分配
# 就地操作
arr *= 2 # 原地修改,不创建新数组
arr += 1
# 使用out参数避免创建临时数组
np.add(arr1, arr2, out=result)
# 使用更小的数据类型
arr_float64 = np.random.randn(10000000) # 默认float64
arr_float32 = arr_float64.astype(np.float32) # 节省50%内存
# 整数类型
arr_int64 = np.arange(10000000) # int64
arr_int32 = arr_int64.astype(np.int32)
arr_int16 = arr_int64.astype(np.int16)
# 无符号类型
arr_uint8 = arr_int64.astype(np.uint8) # 0-255
# 检查是否可以使用更小类型
print(arr_float64.min(), arr_float64.max())
print(arr_int64.min(), arr_int64.max())
import pandas as pd
import numpy as np
# 创建示例数据
df = pd.DataFrame({
'A': np.random.randn(1000000),
'B': np.random.randn(1000000),
'C': np.random.choice(['X', 'Y', 'Z'], 1000000)
})
# 慢速:使用循环
def slow_loop(df):
result = []
for i in range(len(df)):
result.append(df['A'].iloc[i] + df['B'].iloc[i])
return pd.Series(result)
# 中速:使用apply
def medium_apply(df):
return df.apply(lambda row: row['A'] + row['B'], axis=1)
# 快速:向量化
def fast_vectorized(df):
return df['A'] + df['B']
# 最快:使用eval
def fastest_eval(df):
return df.eval('A + B')
# 复杂条件向量化
def complex_condition(df):
conditions = [
(df['A'] > 0) & (df['B'] > 0),
(df['A'] > 0) & (df['B'] <= 0),
(df['A'] <= 0) & (df['B'] > 0)
]
choices = ['Both Positive', 'A Positive', 'B Positive']
return np.select(conditions, choices, default='Other')
# 优化字符串列
df['C'] = df['C'].astype('category')
# 内存对比
memory_before = df.memory_usage(deep=True).sum()
print(f"优化前内存: {memory_before / 1024**2:.2f} MB")
df['C'] = df['C'].astype('category')
memory_after = df.memory_usage(deep=True).sum()
print(f"优化后内存: {memory_after / 1024**2:.2f} MB")
print(f"节省内存: {(memory_before - memory_after) / memory_before * 100:.1f}%")
# 设置索引提高查找速度
df_indexed = df.set_index('C')
# 使用isin而不是循环
values = ['X', 'Y']
# 慢速
result_slow = df[df['C'].isin(values)]
# 使用loc索引
result_loc = df.loc[df['C'].isin(values)]
# 使用query
result_query = df.query('C in @values')
# 使用multiindex
df_multi = df.set_index(['A', 'B', 'C'])
result_multi = df_multi.loc[(0.5, 0.5, 'X')]
# 使用agg而不是apply
# 慢速
result_slow = df.groupby('C').apply(lambda x: x['A'].mean())
# 快速
result_fast = df.groupby('C')['A'].mean()
# 使用transform而不是apply
# 慢速
df['mean_A'] = df.groupby('C').apply(lambda x: x['A'].mean()).values
# 快速
df['mean_A'] = df.groupby('C')['A'].transform('mean')
def downcast(df):
"""降低数据类型以节省内存"""
floats = df.select_dtypes(include=['float64']).columns.tolist()
df[floats] = df[floats].apply(pd.to_numeric, downcast='float')
ints = df.select_dtypes(include=['int64']).columns.tolist()
df[ints] = df[ints].apply(pd.to_numeric, downcast='integer')
return df
df_optimized = downcast(df.copy())
# 查看内存使用
print("优化前:", df.memory_usage(deep=True).sum() / 1024**2, "MB")
print("优化后:", df_optimized.memory_usage(deep=True).sum() / 1024**2, "MB")
# 分块读取大文件
chunk_size = 10000
result = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
processed = process_chunk(chunk) # 处理函数
result.append(processed)
final_result = pd.concat(result, ignore_index=True)
# 使用Dask处理超大数据
import dask.dataframe as dd
ddf = dd.read_csv('very_large_file.csv')
result = ddf.groupby('column').mean().compute()
# 删除重复数据
df.drop_duplicates(inplace=True)
# 删除不需要的列
cols_to_drop = ['temp_col1', 'temp_col2']
df.drop(columns=cols_to_drop, inplace=True)
# 及时释放内存
del large_data_frame
import gc
gc.collect()
# 向量化移动平均
def moving_average_vectorized(prices, window):
return np.convolve(prices, np.ones(window)/window, mode='valid')
# 向量化收益率计算
def returns_vectorized(prices):
return np.diff(np.log(prices))
# 批量技术指标
def batch_technical_indicators(df):
"""批量计算技术指标"""
result = pd.DataFrame(index=df.index)
# 价格指标
result['sma_5'] = df['close'].rolling(5).mean()
result['sma_20'] = df['close'].rolling(20).mean()
result['ema_12'] = df['close'].ewm(span=12).mean()
result['ema_26'] = df['close'].ewm(span=26).mean()
# 波动率指标
result['std_20'] = df['close'].rolling(20).std()
result['atr'] = calculate_atr(df)
return result
# 预计算常用指标
def precompute_indicators(df):
df['returns'] = df['close'].pct_change()
df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
df['volatility'] = df['returns'].rolling(20).std()
return df
# 使用numpy数组而不是DataFrame
def backtest_numpy(prices, signals):
"""使用numpy实现快速回测"""
returns = np.diff(prices) / prices[:-1]
strategy_returns = returns[:-1] * signals[1:]
cumulative = np.cumprod(1 + strategy_returns)
return cumulative
# 使用numba加速
from numba import jit
@jit(nopython=True)
def fast_backtest(prices, signals):
"""使用numba加速回测"""
n = len(prices)
returns = np.empty(n-1)
for i in range(1, n):
returns[i-1] = (prices[i] - prices[i-1]) / prices[i-1]
strategy_returns = returns * signals[1:]
cumulative = np.cumprod(1 + strategy_returns)
return cumulative
✓ 避免在数据操作中使用循环
✓ 优先使用向量化操作
✓ 合理使用广播机制
✓ 选择合适的数据类型
✓ 对字符串列使用category类型
✓ 使用eval和query优化表达式
✓ 预分配内存
✓ 分块处理大数据
✓ 使用高效的数据结构
✓ 定期分析性能瓶颈