注意事项
1. 删除数据前要评估样本量是否充足
2. 填充方法要根据数据特性选择
3. 记录缺失值处理过程,便于追溯
4. 注意填充可能引入的偏差
5. 对关键指标进行敏感性分析
Python数据处理基础
缺失值是金融数据中常见的问题,可能源于停牌、数据传输错误、交易暂停等原因。 正确处理缺失值是保证量化分析质量的关键环节。
# 停牌导致的数据缺失
# 交易时段外无成交
# 新股上市初期
# 数据源故障
# 除权除息日处理
# 节假日调整
import pandas as pd
import numpy as np
# 基本检测
print(df.isnull().sum()) # 每列缺失值数量
print(df.isnull().sum() / len(df) * 100) # 缺失值比例
# 缺失值详情
missing_info = pd.DataFrame({
'列名': df.columns,
'缺失数量': df.isnull().sum().values,
'缺失比例': (df.isnull().sum() / len(df) * 100).values
})
missing_info = missing_info[missing_info['缺失数量'] > 0]
# 缺失模式分析
df['missing_pattern'] = df.isnull().astype(int).sum(axis=1)
print(df['missing_pattern'].value_counts())
# 连续缺失检测
def find_consecutive_missing(series):
missing = series.isnull()
groups = (missing != missing.shift()).cumsum()
result = series.groupby(groups).apply(
lambda x: (x.index[0], x.index[-1], len(x))
)
return result[result.apply(lambda x: x[2] > 0)]
import matplotlib.pyplot as plt
import seaborn as sns
# 热力图
plt.figure(figsize=(12, 8))
sns.heatmap(df.isnull(), cbar=False, cmap='viridis')
plt.title('缺失值热力图')
plt.show()
# 缺失值柱状图
missing_counts = df.isnull().sum()
missing_counts[missing_counts > 0].plot(kind='bar')
plt.title('各列缺失值数量')
plt.ylabel('缺失值数量')
plt.show()
# 缺失值比例图
missing_percentages = (df.isnull().sum() / len(df) * 100)
missing_percentages[missing_percentages > 0].plot(kind='bar')
plt.title('各列缺失值比例')
plt.ylabel('百分比 (%)')
plt.show()
# 使用missingno库
import missingno as msno
# 矩阵图
msno.matrix(df)
# 条形图
msno.bar(df)
# 热力图
msno.heatmap(df)
# 树状图
msno.dendrogram(df)
# 删除包含缺失的行
df_dropped = df.dropna()
# 删除包含缺失的列
df_dropped_cols = df.dropna(axis=1)
# 列表删除
df_dropped_subset = df.dropna(subset=['price', 'volume'])
# 阈值删除(至少有n个非缺失值才保留)
df_dropped_thresh = df.dropna(thresh=3)
# 全删除
df_all_dropped = df.dropna(how='all')
# 按位置删除
df_inplace_drop = df.copy()
df_inplace_drop.dropna(inplace=True)
# 填充固定值
df_filled_zero = df.fillna(0)
df_filled_mean = df.fillna(df.mean())
df_filled_median = df.fillna(df.median())
df_filled_mode = df.fillna(df.mode().iloc[0])
# 前向填充(用前一个值填充)
df_ffill = df.fillna(method='ffill')
df_ffill_limit = df.fillna(method='ffill', limit=2) # 限制连续填充
# 后向填充(用后一个值填充)
df_bfill = df.fillna(method='bfill')
# 插值法
df_linear = df.interpolate(method='linear') # 线性插值
df_polynomial = df.interpolate(method='polynomial', order=2)
df_spline = df.interpolate(method='spline', order=3)
df_time = df.interpolate(method='time') # 时间插值
# 最近邻插值
df_nearest = df.interpolate(method='nearest')
# 按组填充
df['price'] = df.groupby('symbol')['price'].transform(
lambda x: x.fillna(x.mean())
)
# 滚动窗口填充
df['price_ma_filled'] = df.groupby('symbol')['price'].transform(
lambda x: x.fillna(x.rolling(5, min_periods=1).mean())
)
# 基于相关性填充
def fill_by_correlation(df, target_col, corr_threshold=0.8):
# 找出高相关性列
corr_matrix = df.corr()
high_corr_cols = corr_matrix[target_col][
abs(corr_matrix[target_col]) > corr_threshold
].index.tolist()
# 使用这些列填充
for col in high_corr_cols:
df[target_col] = df[target_col].fillna(
df.groupby(col)[target_col].transform('mean')
)
return df
# 机器学习填充
from sklearn.impute import KNNImputer, SimpleImputer
# KNN填充
imputer_knn = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
imputer_knn.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns
)
# 多重插补
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
imputer_mice = IterativeImputer(max_iter=10)
df_mice = pd.DataFrame(
imputer_mice.fit_transform(df.select_dtypes(include=[np.number])),
columns=df.select_dtypes(include=[np.number]).columns
)
# 价格序列缺失值处理
def handle_price_missing(df, price_col='close'):
df_copy = df.copy()
# 使用前值填充(价格通常延续)
df_copy[price_col] = df_copy[price_col].fillna(method='ffill')
# 如果开头有缺失,使用后值填充
df_copy[price_col] = df_copy[price_col].fillna(method='bfill')
return df_copy
# 停牌期间处理
def handle_suspended_data(df, price_col='close', volume_col='volume'):
df_copy = df.copy()
# 停牌期间成交量为0
df_copy[volume_col] = df_copy[volume_col].fillna(0)
# 停牌期间价格维持前值
df_copy[price_col] = df_copy.groupby('symbol')[price_col].transform(
lambda x: x.fillna(method='ffill')
)
return df_copy
# 收益率缺失处理(通常设为0)
df['returns'] = df['returns'].fillna(0)
# 或者跳过缺失日
df_non_missing = df.dropna(subset=['returns'])
# 使用行业平均收益率填充
df['returns'] = df.groupby(['industry', 'date'])['returns'].transform(
lambda x: x.fillna(x.mean())
)
# 时间序列插值
df.set_index('date', inplace=True)
# 线性插值(适合连续变化数据)
df['price_interpolated'] = df['price'].interpolate(method='time')
# 分段线性插值(考虑停牌等特殊情况)
def piecewise_linear_interpolation(series, window=5):
# 在非缺失窗口内插值
result = series.copy()
for i in range(len(series)):
if pd.isna(series.iloc[i]):
# 寻找前后非缺失值
prev_idx = i - 1
while prev_idx >= 0 and pd.isna(series.iloc[prev_idx]):
prev_idx -= 1
next_idx = i + 1
while next_idx < len(series) and pd.isna(series.iloc[next_idx]):
next_idx += 1
# 在窗口范围内插值
if prev_idx >= i - window and next_idx <= i + window:
result.iloc[i] = series.iloc[i-1:i+2].interpolate().iloc[1]
return result
df['price_piecewise'] = piecewise_linear_interpolation(df['price'])
1. 删除数据前要评估样本量是否充足
2. 填充方法要根据数据特性选择
3. 记录缺失值处理过程,便于追溯
4. 注意填充可能引入的偏差
5. 对关键指标进行敏感性分析
1. 先分析缺失机制(MCAR/MAR/MNAR)
2. 保留原始数据,处理使用副本
3. 多种方法对比,选择最适合的
4. 评估填充方法对后续分析的影响
5. 建立缺失值处理流水线,保持一致性