🎯 学习目标

  • 理解金融数据中缺失值的产生原因和类型
  • 掌握各种缺失值检测和可视化方法
  • 学会不同场景下的缺失值处理策略
  • 了解缺失值处理对量化分析的影响
缺失值处理

缺失值处理

缺失值是金融数据中常见的问题,可能源于停牌、数据传输错误、交易暂停等原因。 正确处理缺失值是保证量化分析质量的关键环节。

🔍 缺失值类型与成因

缺失值分类

  • MCAR(完全随机缺失):缺失与其他变量无关
  • MAR(随机缺失):缺失与其他已观测变量相关
  • MNAR(非随机缺失):缺失与未观测变量相关

金融数据常见缺失场景

# 停牌导致的数据缺失
# 交易时段外无成交
# 新股上市初期
# 数据源故障
# 除权除息日处理
# 节假日调整

缺失值检测

import pandas as pd
import numpy as np

# 基本检测
print(df.isnull().sum())  # 每列缺失值数量
print(df.isnull().sum() / len(df) * 100)  # 缺失值比例

# 缺失值详情
missing_info = pd.DataFrame({
    '列名': df.columns,
    '缺失数量': df.isnull().sum().values,
    '缺失比例': (df.isnull().sum() / len(df) * 100).values
})
missing_info = missing_info[missing_info['缺失数量'] > 0]

# 缺失模式分析
df['missing_pattern'] = df.isnull().astype(int).sum(axis=1)
print(df['missing_pattern'].value_counts())

# 连续缺失检测
def find_consecutive_missing(series):
    missing = series.isnull()
    groups = (missing != missing.shift()).cumsum()
    result = series.groupby(groups).apply(
        lambda x: (x.index[0], x.index[-1], len(x))
    )
    return result[result.apply(lambda x: x[2] > 0)]

📊 缺失值可视化

缺失值可视化方法

import matplotlib.pyplot as plt
import seaborn as sns

# 热力图
plt.figure(figsize=(12, 8))
sns.heatmap(df.isnull(), cbar=False, cmap='viridis')
plt.title('缺失值热力图')
plt.show()

# 缺失值柱状图
missing_counts = df.isnull().sum()
missing_counts[missing_counts > 0].plot(kind='bar')
plt.title('各列缺失值数量')
plt.ylabel('缺失值数量')
plt.show()

# 缺失值比例图
missing_percentages = (df.isnull().sum() / len(df) * 100)
missing_percentages[missing_percentages > 0].plot(kind='bar')
plt.title('各列缺失值比例')
plt.ylabel('百分比 (%)')
plt.show()

# 使用missingno库
import missingno as msno

# 矩阵图
msno.matrix(df)

# 条形图
msno.bar(df)

# 热力图
msno.heatmap(df)

# 树状图
msno.dendrogram(df)

🔧 缺失值处理策略

删除法

# 删除包含缺失的行
df_dropped = df.dropna()

# 删除包含缺失的列
df_dropped_cols = df.dropna(axis=1)

# 列表删除
df_dropped_subset = df.dropna(subset=['price', 'volume'])

# 阈值删除(至少有n个非缺失值才保留)
df_dropped_thresh = df.dropna(thresh=3)

# 全删除
df_all_dropped = df.dropna(how='all')

# 按位置删除
df_inplace_drop = df.copy()
df_inplace_drop.dropna(inplace=True)

填充法 - 基础方法

# 填充固定值
df_filled_zero = df.fillna(0)
df_filled_mean = df.fillna(df.mean())
df_filled_median = df.fillna(df.median())
df_filled_mode = df.fillna(df.mode().iloc[0])

# 前向填充(用前一个值填充)
df_ffill = df.fillna(method='ffill')
df_ffill_limit = df.fillna(method='ffill', limit=2)  # 限制连续填充

# 后向填充(用后一个值填充)
df_bfill = df.fillna(method='bfill')

# 插值法
df_linear = df.interpolate(method='linear')  # 线性插值
df_polynomial = df.interpolate(method='polynomial', order=2)
df_spline = df.interpolate(method='spline', order=3)
df_time = df.interpolate(method='time')  # 时间插值

# 最近邻插值
df_nearest = df.interpolate(method='nearest')

填充法 - 高级方法

# 按组填充
df['price'] = df.groupby('symbol')['price'].transform(
    lambda x: x.fillna(x.mean())
)

# 滚动窗口填充
df['price_ma_filled'] = df.groupby('symbol')['price'].transform(
    lambda x: x.fillna(x.rolling(5, min_periods=1).mean())
)

# 基于相关性填充
def fill_by_correlation(df, target_col, corr_threshold=0.8):
    # 找出高相关性列
    corr_matrix = df.corr()
    high_corr_cols = corr_matrix[target_col][
        abs(corr_matrix[target_col]) > corr_threshold
    ].index.tolist()

    # 使用这些列填充
    for col in high_corr_cols:
        df[target_col] = df[target_col].fillna(
            df.groupby(col)[target_col].transform('mean')
        )

    return df

# 机器学习填充
from sklearn.impute import KNNImputer, SimpleImputer

# KNN填充
imputer_knn = KNNImputer(n_neighbors=5)
df_imputed = pd.DataFrame(
    imputer_knn.fit_transform(df.select_dtypes(include=[np.number])),
    columns=df.select_dtypes(include=[np.number]).columns
)

# 多重插补
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer

imputer_mice = IterativeImputer(max_iter=10)
df_mice = pd.DataFrame(
    imputer_mice.fit_transform(df.select_dtypes(include=[np.number])),
    columns=df.select_dtypes(include=[np.number]).columns
)

📈 金融数据缺失值处理

价格数据缺失处理

# 价格序列缺失值处理
def handle_price_missing(df, price_col='close'):
    df_copy = df.copy()

    # 使用前值填充(价格通常延续)
    df_copy[price_col] = df_copy[price_col].fillna(method='ffill')

    # 如果开头有缺失,使用后值填充
    df_copy[price_col] = df_copy[price_col].fillna(method='bfill')

    return df_copy

# 停牌期间处理
def handle_suspended_data(df, price_col='close', volume_col='volume'):
    df_copy = df.copy()

    # 停牌期间成交量为0
    df_copy[volume_col] = df_copy[volume_col].fillna(0)

    # 停牌期间价格维持前值
    df_copy[price_col] = df_copy.groupby('symbol')[price_col].transform(
        lambda x: x.fillna(method='ffill')
    )

    return df_copy

收益率数据缺失处理

# 收益率缺失处理(通常设为0)
df['returns'] = df['returns'].fillna(0)

# 或者跳过缺失日
df_non_missing = df.dropna(subset=['returns'])

# 使用行业平均收益率填充
df['returns'] = df.groupby(['industry', 'date'])['returns'].transform(
    lambda x: x.fillna(x.mean())
)

时序数据插值

# 时间序列插值
df.set_index('date', inplace=True)

# 线性插值(适合连续变化数据)
df['price_interpolated'] = df['price'].interpolate(method='time')

# 分段线性插值(考虑停牌等特殊情况)
def piecewise_linear_interpolation(series, window=5):
    # 在非缺失窗口内插值
    result = series.copy()

    for i in range(len(series)):
        if pd.isna(series.iloc[i]):
            # 寻找前后非缺失值
            prev_idx = i - 1
            while prev_idx >= 0 and pd.isna(series.iloc[prev_idx]):
                prev_idx -= 1

            next_idx = i + 1
            while next_idx < len(series) and pd.isna(series.iloc[next_idx]):
                next_idx += 1

            # 在窗口范围内插值
            if prev_idx >= i - window and next_idx <= i + window:
                result.iloc[i] = series.iloc[i-1:i+2].interpolate().iloc[1]

    return result

df['price_piecewise'] = piecewise_linear_interpolation(df['price'])
⚠️
注意事项

1. 删除数据前要评估样本量是否充足
2. 填充方法要根据数据特性选择
3. 记录缺失值处理过程,便于追溯
4. 注意填充可能引入的偏差
5. 对关键指标进行敏感性分析

💡
最佳实践

1. 先分析缺失机制(MCAR/MAR/MNAR)
2. 保留原始数据,处理使用副本
3. 多种方法对比,选择最适合的
4. 评估填充方法对后续分析的影响
5. 建立缺失值处理流水线,保持一致性

📝 本节小结

  • • 理解了金融数据缺失值的类型和成因
  • • 掌握了缺失值检测和可视化方法
  • • 学会了多种缺失值处理策略
  • • 掌握了金融数据特定场景的缺失值处理