news 2026/7/6 0:39:01

ETDataset 数据集预处理实战:从原始CSV到PyTorch DataLoader的5个关键步骤

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ETDataset 数据集预处理实战:从原始CSV到PyTorch DataLoader的5个关键步骤

ETDataset 数据集预处理实战:从原始CSV到PyTorch DataLoader的5个关键步骤

电力变压器温度预测是能源管理领域的重要课题,ETDataset作为业内广泛使用的时间序列数据集,为研究者提供了丰富的实战素材。本文将手把手带你完成从原始CSV文件到可投入模型训练的PyTorch DataLoader的全流程,重点解决实际工程中的五个核心挑战。

1. 数据加载与初步探索

任何数据预处理流程的第一步都是理解数据结构和内容。ETDataset通常以CSV格式存储,包含分钟级(ETTm)和小时级(ETTh)两种时间粒度。我们先使用pandas加载数据并检查基本特征:

import pandas as pd # 加载小时级数据示例 df = pd.read_csv('ETTh1.csv') print(f"数据集形状: {df.shape}") print(df.head()) # 检查数据类型和缺失值 print("\n数据类型统计:") print(df.dtypes) print("\n缺失值统计:") print(df.isnull().sum())

典型输出会显示8个特征列:

  • date: 时间戳
  • HUFL/HULL: 高使用率有效/无效负载
  • MUFL/MULL: 中等使用率有效/无效负载
  • LUFL/LULL: 低使用率有效/无效负载
  • OT: 油温(预测目标)

注意:实际项目中建议使用pathlib代替直接字符串路径,增强代码可移植性。例如:

from pathlib import Path data_path = Path('data/ETTh1.csv')

时间序列数据的探索性分析(EDA)至关重要。快速绘制特征分布和趋势图:

import matplotlib.pyplot as plt df['OT'].plot(title='Oil Temperature Trend') plt.xlabel('Time') plt.ylabel('Temperature') plt.show()

2. 时间戳解析与索引处理

正确处理时间特征是时间序列分析的基础。ETDataset中的date列需要转换为datetime类型并设为索引:

df['date'] = pd.to_datetime(df['date']) df.set_index('date', inplace=True) # 检查时间序列连续性 time_diff = df.index.to_series().diff() print(f"时间间隔统计:\n{time_diff.value_counts()}")

对于存在不规则间隔的数据,我们需要进行重采样。例如将分钟级数据统一为15分钟间隔:

# 只适用于ETTm数据集 df_resampled = df.resample('15T').mean()

处理大型数据集时,可以考虑使用dask库进行分布式加载:

import dask.dataframe as dd ddf = dd.read_csv('ETTh1.csv', parse_dates=['date']) ddf = ddf.set_index('date')

3. 特征工程与归一化

电力负载特征通常存在量纲差异,需要进行标准化处理。我们使用Scikit-learn的ColumnTransformer实现差异化处理:

from sklearn.preprocessing import StandardScaler, MinMaxScaler from sklearn.compose import ColumnTransformer # 定义特征分组 load_features = ['HUFL', 'HULL', 'MUFL', 'MULL', 'LUFL', 'LULL'] target_feature = ['OT'] # 创建转换器 preprocessor = ColumnTransformer( transformers=[ ('load', StandardScaler(), load_features), ('temp', MinMaxScaler(), target_feature) ], remainder='passthrough' ) # 应用转换 df_scaled = pd.DataFrame( preprocessor.fit_transform(df), columns=load_features + target_feature, index=df.index )

对于时间序列数据,我们通常还需要创建滞后特征:

# 创建24小时滞后特征 for feature in load_features: df_scaled[f'{feature}_lag24'] = df_scaled[feature].shift(24) # 删除因滞后产生的缺失值 df_scaled.dropna(inplace=True)

4. 缺失值处理与异常检测

尽管ETDataset已经过预处理,实际项目中仍需处理数据质量问题。以下是常见的处理方法:

缺失值处理策略对比表

方法适用场景优点缺点
线性插值少量连续缺失保持趋势可能低估波动
前向填充设备短暂故障简单快速延长异常影响
季节性均值周期性数据保留周期特征忽略近期变化
模型预测大量缺失精度高计算成本高

异常值检测可以使用滑动窗口统计法:

def detect_anomalies(series, window=24, sigma=3): rolling_mean = series.rolling(window).mean() rolling_std = series.rolling(window).std() return series[(series - rolling_mean).abs() > sigma * rolling_std] anomalies = detect_anomalies(df_scaled['OT']) print(f"检测到异常值数量: {len(anomalies)}")

5. 构建时序样本与DataLoader

时间序列预测需要将数据转换为监督学习格式。我们实现一个灵活的滑动窗口生成器:

import torch from torch.utils.data import Dataset, DataLoader class ETDataset(Dataset): def __init__(self, data, window_size=24, horizon=12): self.data = torch.FloatTensor(data.values) self.window_size = window_size self.horizon = horizon def __len__(self): return len(self.data) - self.window_size - self.horizon + 1 def __getitem__(self, idx): x = self.data[idx:idx+self.window_size, :-1] # 除最后一列外的所有特征 y = self.data[idx+self.window_size:idx+self.window_size+self.horizon, -1] # 最后一列是目标 return x, y # 示例使用 dataset = ETDataset(df_scaled, window_size=24*7, horizon=24) # 使用一周数据预测下一天 dataloader = DataLoader(dataset, batch_size=32, shuffle=True) # 检查一个批次 for x, y in dataloader: print(f"输入形状: {x.shape}, 输出形状: {y.shape}") break

提示:对于多GPU训练,可以使用DistributedSampler优化数据加载:

from torch.utils.data.distributed import DistributedSampler sampler = DistributedSampler(dataset) dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

完整的数据处理流程还应包括数据集分割。时间序列需要按时间顺序划分:

# 按时间划分训练/验证/测试集 train_size = int(0.7 * len(df_scaled)) val_size = int(0.2 * len(df_scaled)) train_data = df_scaled.iloc[:train_size] val_data = df_scaled.iloc[train_size:train_size+val_size] test_data = df_scaled.iloc[train_size+val_size:] print(f"数据集划分: 训练集 {len(train_data)}, 验证集 {len(val_data)}, 测试集 {len(test_data)}")

高级技巧与性能优化

处理大型时间序列数据集时,内存和计算效率至关重要。以下是几个实用技巧:

内存映射技术:对于超大型数据集,可以使用numpy的memmap功能:

import numpy as np # 将数据保存为memmap格式 fp = np.memmap('temp.mmap', dtype='float32', mode='w+', shape=df_scaled.values.shape) fp[:] = df_scaled.values[:] del fp # 刷新到磁盘 # 后续加载 data = np.memmap('temp.mmap', dtype='float32', mode='r', shape=df_scaled.values.shape)

并行预处理:使用joblib加速特征工程:

from joblib import Parallel, delayed def process_feature(col): return df[col].rolling(24).mean() results = Parallel(n_jobs=4)(delayed(process_feature)(col) for col in load_features)

自定义DataLoader:实现更高效的时间序列采样:

class SequentialSampler(torch.utils.data.Sampler): def __init__(self, data_source, stride=1): self.data_source = data_source self.stride = stride def __iter__(self): n = len(self.data_source) return iter(range(0, n, self.stride)) def __len__(self): return len(range(0, len(self.data_source), self.stride)) sampler = SequentialSampler(dataset, stride=12) dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)

工程化部署考虑

将预处理流程产品化时,需要关注以下方面:

  1. 可复现性:使用joblib保存预处理管道

    from joblib import dump dump(preprocessor, 'preprocessor.joblib')
  2. 监控数据漂移:定期检查特征分布变化

    from scipy import stats def detect_drift(reference, current, alpha=0.05): pvalues = [] for col in reference.columns: _, p = stats.ks_2samp(reference[col], current[col]) pvalues.append(p > alpha) return sum(pvalues) / len(pvalues)
  3. 自动化测试:为数据质量添加断言检查

    assert df_scaled.isnull().sum().sum() == 0, "存在缺失值未处理" assert (df_scaled.index == pd.Series(df_scaled.index).drop_duplicates()).all(), "索引不唯一"
  4. 性能基准:记录各步骤耗时

    from time import perf_counter class Timer: def __enter__(self): self.start = perf_counter() return self def __exit__(self, *args): self.end = perf_counter() self.duration = self.end - self.start with Timer() as t: preprocessor.fit_transform(df) print(f"预处理耗时: {t.duration:.2f}秒")
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/6 0:31:25

ELK Stack 安全加固:Kibana 7.6.1 启用 X-Pack 认证的 5 个关键步骤

ELK Stack 安全加固实战:从零构建企业级认证体系 为什么生产环境必须启用X-Pack安全模块 在数字化转型浪潮中,ELK Stack已成为企业日志管理和数据分析的事实标准。但许多团队在开发测试阶段往往忽略了一个关键环节——安全认证。我曾亲眼见证一家金融科…

作者头像 李华
网站建设 2026/7/6 0:14:24

3步颠覆性数据自主方案:如何让微信对话成为你的个人数字资产

3步颠覆性数据自主方案:如何让微信对话成为你的个人数字资产 【免费下载链接】WeChatMsg 提取微信聊天记录,将其导出成HTML、Word、CSV文档永久保存,对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/GitHub_Trending/we…

作者头像 李华
网站建设 2026/7/6 0:12:33

DQN 算法实战:CartPole-v0 环境 1000 轮训练实现 200 分满分

DQN算法实战:从零构建CartPole智能体的完整指南1. 环境准备与基础概念在开始构建DQN智能体之前,我们需要先理解几个核心概念。CartPole-v0是OpenAI Gym中的一个经典控制问题,目标是让小车上的杆子保持直立不倒下。这个环境有四个状态变量&…

作者头像 李华
网站建设 2026/7/6 0:12:02

位置编码外推实战:从BERT 512到26万token的3种延拓策略

位置编码外推实战:从BERT 512到26万token的3种延拓策略当处理长文本序列时,BERT等Transformer模型面临一个根本性限制——位置编码的长度约束。传统BERT模型最多只能处理512个token,这严重制约了其在长文档理解、基因组分析等场景的应用潜力。…

作者头像 李华
网站建设 2026/7/6 0:11:58

终极iOS降级指南:用downr1n解锁旧版系统自由

终极iOS降级指南:用downr1n解锁旧版系统自由 【免费下载链接】downr1n downgrade tethered checkm8 idevices ios 14, 15. 项目地址: https://gitcode.com/gh_mirrors/do/downr1n 你是否曾因iOS系统升级后悔莫及?新版本卡顿、耗电快,或…

作者头像 李华
网站建设 2026/7/6 0:10:35

数据结构06-树结构

一、树的介绍① 树:由一个根节点和若干个分支节点构成的具有一对多关系的数据的集合。每一棵树必有一特定的节点,称做根节点(root);根节点之下可以有零个以上的子节点(可以没有),而各子节点也可为子树,拥有…

作者头像 李华