from typing import List
import numpy as np
import pandas as pd
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset
import torch
from .base import BaseDataset, register_dataset, get_data_dir
"""
Expected file organization:
[data_dir]
ETTh1.csv
ETTh2.csv
ETTm1.csv
ETTm2.csv
exchange_rate.csv
electricity.csv
national_illness.csv
traffic.csv
"""
class TimeFeature:
def __init__(self):
pass
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
pass
def __repr__(self):
return self.__class__.__name__ + "()"
class SecondOfMinute(TimeFeature):
"""Minute of hour encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return index.second / 59.0 - 0.5
class MinuteOfHour(TimeFeature):
"""Minute of hour encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return index.minute / 59.0 - 0.5
class HourOfDay(TimeFeature):
"""Hour of day encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return index.hour / 23.0 - 0.5
class DayOfWeek(TimeFeature):
"""Hour of day encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return index.dayofweek / 6.0 - 0.5
class DayOfMonth(TimeFeature):
"""Day of month encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return (index.day - 1) / 30.0 - 0.5
class DayOfYear(TimeFeature):
"""Day of year encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return (index.dayofyear - 1) / 365.0 - 0.5
class MonthOfYear(TimeFeature):
"""Month of year encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return (index.month - 1) / 11.0 - 0.5
class WeekOfYear(TimeFeature):
"""Week of year encoded as value between [-0.5, 0.5]"""
def __call__(self, index: pd.DatetimeIndex) -> np.ndarray:
return (index.isocalendar().week - 1) / 52.0 - 0.5
def time_features_from_frequency_str(freq_str: str) -> List[TimeFeature]:
"""Returns a list of time features that will be appropriate for the given frequency string.
Args:
freq_str (str): Frequency string of the form [multiple][granularity] such as "12H", "5min", "1D" etc.
"""
features_by_offsets = {
offsets.YearEnd: [],
offsets.QuarterEnd: [MonthOfYear],
offsets.MonthEnd: [MonthOfYear],
offsets.Week: [DayOfMonth, WeekOfYear],
offsets.Day: [DayOfWeek, DayOfMonth, DayOfYear],
offsets.BusinessDay: [DayOfWeek, DayOfMonth, DayOfYear],
offsets.Hour: [HourOfDay, DayOfWeek, DayOfMonth, DayOfYear],
offsets.Minute: [
MinuteOfHour,
HourOfDay,
DayOfWeek,
DayOfMonth,
DayOfYear,
],
offsets.Second: [
SecondOfMinute,
MinuteOfHour,
HourOfDay,
DayOfWeek,
DayOfMonth,
DayOfYear,
],
}
offset = to_offset(freq_str)
for offset_type, feature_classes in features_by_offsets.items():
if isinstance(offset, offset_type):
return [cls() for cls in feature_classes]
supported_freq_msg = f"""
Unsupported frequency {freq_str}
The following frequencies are supported:
Y - yearly
alias: A
M - monthly
W - weekly
D - daily
B - business days
H - hourly
T - minutely
alias: min
S - secondly
"""
raise RuntimeError(supported_freq_msg)
def time_features(dates, timeenc=1, freq="h"):
"""
``time_features`` takes in a ``dates`` dataframe with a `'dates`' column and extracts the date down to ``freq`` where freq can be any of the following if ``timeenc`` is 0:
.. code-block:: python
m - [month]
w - [month]
d - [month, day, weekday]
b - [month, day, weekday]
h - [month, day, weekday, hour]
t - [month, day, weekday, hour, *minute]
If ``timeenc`` is 1, a similar, but different list of ``freq`` values are supported (all encoded between [-0.5 and 0.5]):
.. code-block:: python
Q - [month]
M - [month]
W - [Day of month, week of year]
D - [Day of week, day of month, day of year]
B - [Day of week, day of month, day of year]
H - [Hour of day, day of week, day of month, day of year]
T - [Minute of hour*, hour of day, day of week, day of month, day of year]
S - [Second of minute, minute of hour, hour of day, day of week, day of month, day of year]
minute returns a number from 0-3 corresponding to the 15 minute period it falls into.
"""
if timeenc == 0:
dates["month"] = dates.date.apply(lambda row: row.month, 1)
dates["day"] = dates.date.apply(lambda row: row.day, 1)
dates["weekday"] = dates.date.apply(lambda row: row.weekday(), 1)
dates["hour"] = dates.date.apply(lambda row: row.hour, 1)
dates["minute"] = dates.date.apply(lambda row: row.minute, 1)
dates["minute"] = dates.minute.map(lambda x: x // 15)
freq_map = {
"y": [],
"m": ["month"],
"w": ["month"],
"d": ["month", "day", "weekday"],
"b": ["month", "day", "weekday"],
"h": ["month", "day", "weekday", "hour"],
"t": ["month", "day", "weekday", "hour", "minute"],
}
return dates[freq_map[freq.lower()]].values
if timeenc == 1:
dates = pd.to_datetime(dates.date.values)
return np.vstack([feat(dates) for feat in time_features_from_frequency_str(freq)]).transpose(1, 0)
class StandardScaler:
def __init__(self, mean, std):
self.mean = mean
self.std = std
def inverse_transform(self, data):
mean = torch.from_numpy(self.mean).type_as(data).to(data.device) if torch.is_tensor(data) else self.mean
std = torch.from_numpy(self.std).type_as(data).to(data.device) if torch.is_tensor(data) else self.std
return (data * std) + mean
[docs]@register_dataset
class LTSF(BaseDataset):
"""
这是一个用于长时序预测的时间序列数据集
该数据集包含电力、经济、医疗、交通等不同领域下的长期时间序列数据。更多信息参考:https://github.com/thuml/Autoformer
Args:
data_name (str): 具体的数据名字,包括:``ETTh1``,``ETTh2``,``ETTm1``,``ETTm2``,``exchange_rate``,``electricity``,``national_illness``,``traffic``
split (str): 数据集划分形式,包括:训练集(``train``)或者验证集(``val``)
seq_len (int): 描述时间序列数据的序列长度
label_len (int): 描述时间序列数据的标签长度
pred_len (int): 描述时间序列数据的预测长度
features (str): 预测的维度,包括:多维预测多维(``M``),单维预测单维(``S``),多维预测单维(``MS``)
target (str): 具体要预测的指标列,当 ``features`` 为 ``S`` 或者 ``MS`` 时奏效
timeenc (int): 不同的时间特征编码,默认为 ``0``
freq (str): 时间特征编码的频率, 包括:每秒(``s``),每分钟(``t``),每小时(``h``),每天(``d``),每工作日(``b``),每星期(``w``),每月(``m``)。你也可以选择更定制的频次,例如:``15min`` 或者 ``3h``
Returns:
seq_x, seq_y, seq_x_mask, seq_y_mask (np.ndarray, np.ndarray, np.ndarray, np.ndarray): 返回的每个样本是一个四元组,包括历史指标序列,未来指标序列,历史时间位置信息编码,未来时间位置信息编码
Examples:
.. code-block:: python
from hfai.datasets import LTSF
dataset = LTSF(data_name, split, seq_len, label_len, pred_len, features, target, timeenc, freq)
loader = dataset.loader(batch_size=64, num_workers=4)
for seq_x, seq_y, seq_x_mask, seq_y_mask in loader:
# training model
"""
def __init__(
self,
data_name: str,
split: str,
seq_len=96,
label_len=48,
pred_len=24,
features="S",
target="OT",
timeenc=0,
freq="h",
) -> None:
super(LTSF, self).__init__()
assert data_name in [
"ETTh1",
"ETTh2",
"ETTm1",
"ETTm2",
"exchange_rate",
"electricity",
"national_illness",
"traffic",
]
self.data_path = str(get_data_dir() / "LongTermSeriesData" / f"{data_name}.csv")
assert split in ["train", "val"]
self.split = split
self.seq_len = seq_len
self.label_len = label_len
self.pred_len = pred_len
self.features = features
self.target = target
self.timeenc = timeenc
self.freq = freq
self.dataset = self.__load_dataset__()
def __len__(self):
return len(self.dataset)
def __load_dataset__(self):
df_raw = pd.read_csv(self.data_path)
num_train = int(len(df_raw) * 0.8)
num_val = len(df_raw) - num_train
if self.split == "train":
split_start, split_end = 0, num_train
elif self.split == "val":
split_start, split_end = num_train - self.seq_len, num_train + num_val
else:
raise ValueError("invalid parameters: split: {}".format(self.split))
if self.features == "M" or self.features == "MS":
cols_data = df_raw.columns[1:]
df_data = df_raw[cols_data]
elif self.features == "S":
df_data = df_raw[[self.target]]
else:
raise ValueError("invalid parameters: features: {}".format(self.features))
mean, std = df_data.values.mean(0), df_data.values.std(0)
data = (df_data.values - mean) / std
self.scaler = StandardScaler(mean, std)
data_x = data[split_start:split_end]
data_y = df_data.values[split_start:split_end]
df_stamp = df_raw[["date"]][split_start:split_end]
df_stamp["date"] = pd.to_datetime(df_stamp.date)
data_stamp = time_features(df_stamp, timeenc=self.timeenc, freq=self.freq)
dataset = []
for i in range(len(data_x) - self.seq_len - self.pred_len + 1):
s_begin = i
s_end = s_begin + self.seq_len
r_begin = s_end - self.label_len
r_end = r_begin + self.label_len + self.pred_len
seq_x = data_x[s_begin:s_end]
seq_y = np.concatenate(
[data_x[r_begin : r_begin + self.label_len], data_y[r_begin + self.label_len : r_end]], 0
)
seq_x_mark = data_stamp[s_begin:s_end]
seq_y_mark = data_stamp[r_begin:r_end]
dataset.append((seq_x, seq_y, seq_x_mark, seq_y_mark))
return dataset
def __getitem__(self, indices):
samples = []
for i in indices:
item = self.dataset[i]
samples.append(item)
return samples
[docs] def get_scaler(self):
"""获取LTSF数据的统计特征信息。
Returns:
数据分布统计对象,包含:指标数据的均值(``mean``),指标数据的方差(``std``)
"""
return self.scaler